Qore Programming Language 2.1.1
Loading...
Searching...
No Matches
qore_socket_private.h
1/* -*- mode: c++; indent-tabs-mode: nil -*- */
2/*
3 qore_socket_private.h
4
5 Qore Programming Language
6
7 Copyright (C) 2003 - 2024 Qore Technologies, s.r.o.
8
9 Permission is hereby granted, free of charge, to any person obtaining a
10 copy of this software and associated documentation files (the "Software"),
11 to deal in the Software without restriction, including without limitation
12 the rights to use, copy, modify, merge, publish, distribute, sublicense,
13 and/or sell copies of the Software, and to permit persons to whom the
14 Software is furnished to do so, subject to the following conditions:
15
16 The above copyright notice and this permission notice shall be included in
17 all copies or substantial portions of the Software.
18
19 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
24 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25 DEALINGS IN THE SOFTWARE.
26
27 Note that the Qore library is released under a choice of three open-source
28 licenses: MIT (as above), LGPL 2+, or GPL 2+; see README-LICENSE for more
29 information.
30*/
31
32#ifndef _QORE_QORE_SOCKET_PRIVATE_H
33#define _QORE_QORE_SOCKET_PRIVATE_H
34
35#include "qore/AbstractPollState.h"
36#include "qore/QoreSocket.h"
37#include "qore/InputStream.h"
38#include "qore/OutputStream.h"
39
40#include "qore/intern/SSLSocketHelper.h"
41#include "qore/intern/QC_Queue.h"
42
43#include <cctype>
44#include <cctype>
45#include <cerrno>
46#include <cstdlib>
47#include <cstring>
48#include <strings.h>
49
50#include <openssl/ssl.h>
51#include <openssl/err.h>
52
53#if defined HAVE_POLL
54#include <poll.h>
55#elif defined HAVE_SYS_SELECT_H
56#include <sys/select.h>
57#elif (defined _WIN32 || defined __WIN32__) && ! defined __CYGWIN__
58#define HAVE_SELECT 1
59#else
60#error no async socket I/O APIs available
61#endif
62
63#ifndef DEFAULT_SOCKET_BUFSIZE
64#define DEFAULT_SOCKET_BUFSIZE (64 * 1024)
65#endif
66
67#ifndef QORE_MAX_HEADER_SIZE
68#define QORE_MAX_HEADER_SIZE 16384
69#endif
70
71#define CHF_HTTP11 (1 << 0)
72#define CHF_PROCESS (1 << 1)
73#define CHF_REQUEST (1 << 2)
74
75#ifndef DEFAULT_SOCKET_MIN_THRESHOLD_BYTES
76#define DEFAULT_SOCKET_MIN_THRESHOLD_BYTES 1024
77#endif
78
79static constexpr int SOCK_POLLIN = (1 << 0);
80static constexpr int SOCK_POLLOUT = (1 << 1);
81static constexpr int SOCK_POLLERR = (1 << 2);
82
83DLLLOCAL void concat_target(QoreString& str, const struct sockaddr *addr, const char* type = "target");
84DLLLOCAL int do_read_error(ssize_t rc, const char* method_name, int timeout_ms, ExceptionSink* xsink);
85DLLLOCAL int sock_get_raw_error();
86DLLLOCAL int sock_get_error();
87DLLLOCAL void qore_socket_error(ExceptionSink* xsink, const char* err, const char* cdesc, const char* mname = nullptr,
88 const char* host = nullptr, const char* svc = nullptr, const struct sockaddr *addr = nullptr);
89DLLLOCAL void qore_socket_error_intern(int rc, ExceptionSink* xsink, const char* err, const char* cdesc,
90 const char* mname = nullptr, const char* host = nullptr, const char* svc = nullptr,
91 const struct sockaddr* addr = nullptr);
92DLLLOCAL void se_in_op(const char* cname, const char* meth, ExceptionSink* xsink);
93DLLLOCAL void se_in_op_thread(const char* cname, const char* meth, ExceptionSink* xsink);
94DLLLOCAL void se_not_open(const char* cname, const char* meth, ExceptionSink* xsink, const char* extra = nullptr);
95DLLLOCAL void se_timeout(const char* cname, const char* meth, int timeout_ms, ExceptionSink* xsink,
96 const char* extra = nullptr);
97DLLLOCAL void se_closed(const char* cname, const char* mname, ExceptionSink* xsink);
98
99class Transform;
100
101#ifdef _Q_WINDOWS
102#define GETSOCKOPT_ARG_4 char*
103#define SETSOCKOPT_ARG_4 const char*
104#define SHUTDOWN_ARG SD_BOTH
105#define QORE_INVALID_SOCKET ((int)INVALID_SOCKET)
106#define QORE_SOCKET_ERROR SOCKET_ERROR
107DLLLOCAL int check_windows_rc(int rc);
108DLLLOCAL int windows_set_errno();
109
110#ifndef ECONNRESET
111#define ECONNRESET WSAECONNRESET
112#endif
113
114#else
115// UNIX/Cygwin
116#define GETSOCKOPT_ARG_4 void*
117#define SETSOCKOPT_ARG_4 void*
118#define SHUTDOWN_ARG SHUT_RDWR
119#define QORE_INVALID_SOCKET -1
120#define QORE_SOCKET_ERROR -1
121#endif
122
123template <typename T>
124class PrivateDataListHolder {
125public:
126 DLLLOCAL PrivateDataListHolder(ExceptionSink* xsink) : xsink(xsink) {
127 }
128
129 DLLLOCAL ~PrivateDataListHolder() {
130 for (auto& i : pd_vec)
131 i->deref(xsink);
132 }
133
134 DLLLOCAL T* add(const QoreObject* o, qore_classid_t cid) {
135 T* pd = static_cast<T*>(o->getReferencedPrivateData(cid, xsink));
136 if (!pd)
137 return nullptr;
138 pd_vec.push_back(pd);
139 return pd;
140 }
141
142private:
143 typedef std::vector<T*> pd_vec_t;
144 pd_vec_t pd_vec;
145 ExceptionSink* xsink;
146};
147
148hashdecl qore_socketsource_private {
149 QoreStringNode* address;
150 QoreStringNode* hostname;
151
152 DLLLOCAL qore_socketsource_private() : address(0), hostname(0) {
153 }
154
155 DLLLOCAL ~qore_socketsource_private() {
156 if (address) address->deref();
157 if (hostname) hostname->deref();
158 }
159
160 DLLLOCAL void setAddress(QoreStringNode* addr) {
161 assert(!address);
162 address = addr;
163 }
164
165 DLLLOCAL void setAddress(const char* addr) {
166 assert(!address);
167 address = new QoreStringNode(addr);
168 }
169
170 DLLLOCAL void setHostName(const char* host) {
171 assert(!hostname);
172 hostname = new QoreStringNode(host);
173 }
174
175 DLLLOCAL void setAll(QoreObject* o, ExceptionSink* xsink) {
176 if (address) {
177 o->setValue("source", address, xsink);
178 address = 0;
179 }
180
181 if (hostname) {
182 o->setValue("source_host", hostname, xsink);
183 hostname = 0;
184 }
185 }
186};
187
188class OptionalNonBlockingHelper {
189public:
190 qore_socket_private& sock;
191 ExceptionSink* xsink;
192 bool set;
193
194 DLLLOCAL OptionalNonBlockingHelper(qore_socket_private& s, bool n_set, ExceptionSink* xs);
195 DLLLOCAL ~OptionalNonBlockingHelper();
196};
197
198class PrivateQoreSocketTimeoutBase {
199public:
200 DLLLOCAL PrivateQoreSocketTimeoutBase(qore_socket_private* s) : sock(s), start(sock ? q_clock_getmicros() : 0) {
201 }
202
203protected:
204 hashdecl qore_socket_private* sock;
205 int64 start;
206};
207
208class PrivateQoreSocketTimeoutHelper : public PrivateQoreSocketTimeoutBase {
209public:
210 DLLLOCAL PrivateQoreSocketTimeoutHelper(qore_socket_private* s, const char* op);
211 DLLLOCAL ~PrivateQoreSocketTimeoutHelper();
212
213protected:
214 const char* op;
215};
216
217class PrivateQoreSocketThroughputHelper : public PrivateQoreSocketTimeoutBase {
218public:
219 DLLLOCAL PrivateQoreSocketThroughputHelper(qore_socket_private* s, bool snd);
220 DLLLOCAL ~PrivateQoreSocketThroughputHelper();
221
222 DLLLOCAL void finalize(int64 bytes);
223
224protected:
225 bool send;
226};
227
228hashdecl qore_socket_private;
229
230hashdecl qore_socket_op_helper {
231protected:
232 qore_socket_private* s;
233
234public:
235 DLLLOCAL qore_socket_op_helper(qore_socket_private* sock);
236 DLLLOCAL ~qore_socket_op_helper();
237};
238
239class SSLSocketHelperHelper {
240protected:
241 qore_socket_private* s;
242 SSLSocketHelper* ssl;
243 bool context_saved = false;
244
245public:
246 DLLLOCAL SSLSocketHelperHelper(qore_socket_private* sock, bool set_thread_context = false);
247
248 DLLLOCAL ~SSLSocketHelperHelper();
249
250 DLLLOCAL void error();
251};
252
253constexpr int SCIPS_CONNECT = 0;
254constexpr int SCIPS_CHECK_CONNECT = 1;
255
256class SocketConnectInetPollState : public AbstractPollState {
257public:
258 DLLLOCAL SocketConnectInetPollState(ExceptionSink* xsink, qore_socket_private* sock, const char* host,
259 const char* service, int family = AF_UNSPEC, int type = SOCK_STREAM, int protocol = 0);
260
267 DLLLOCAL virtual int continuePoll(ExceptionSink* xsink);
268
269private:
270 QoreAddrInfo ai;
271 qore_socket_private* sock;
272 std::string host, service;
273 hashdecl addrinfo* p = nullptr;
274 int prt = -1;
275 int state = SCIPS_CONNECT;
276
277 DLLLOCAL int doConnect(ExceptionSink* xsink);
278
279 // returns 0 = connected, 1 = try again, -1 = error
280 DLLLOCAL int checkConnection(ExceptionSink* xsink);
281
283 DLLLOCAL int next(ExceptionSink* xsink);
284
286 DLLLOCAL int nextIntern(ExceptionSink* xsink);
287};
288
289#ifndef _Q_WINDOWS
290class SocketConnectUnixPollState : public AbstractPollState {
291public:
292 DLLLOCAL SocketConnectUnixPollState(ExceptionSink* xsink, qore_socket_private* sock, const char* name,
293 int type = SOCK_STREAM, int protocol = 0);
294
301 DLLLOCAL virtual int continuePoll(ExceptionSink* xsink);
302
303private:
304 qore_socket_private* sock;
305 std::string name;
306 hashdecl sockaddr_un addr;
307 int state = SCIPS_CONNECT;
308
309 DLLLOCAL int doConnect(ExceptionSink* xsink);
310
311 // returns 0 = connected, 1 = try again, -1 = error
312 DLLLOCAL int checkConnection(ExceptionSink* xsink);
313};
314#endif
315
316class SocketConnectSslPollState : public AbstractPollState {
317public:
318 DLLLOCAL SocketConnectSslPollState(ExceptionSink* xsink, qore_socket_private* sock,
319 QoreSSLCertificate* cert = nullptr, QoreSSLPrivateKey* pkey = nullptr);
320
327 DLLLOCAL virtual int continuePoll(ExceptionSink* xsink);
328
329private:
330 qore_socket_private* sock;
331
332 // returns 0 = connected, 1 = try again, -1 = error
333 DLLLOCAL int checkConnection(ExceptionSink* xsink);
334};
335
336class SocketAcceptPollState : public AbstractPollState {
337public:
338 DLLLOCAL SocketAcceptPollState(ExceptionSink* xsink, qore_socket_private* sock);
339
346 DLLLOCAL virtual int continuePoll(ExceptionSink* xsink);
347
349 DLLLOCAL int getDescriptor() const {
350 return descriptor;
351 }
352
353private:
354 qore_socket_private* sock;
355 int descriptor = -1;
356};
357
358class SocketAcceptSslPollState : public AbstractPollState {
359public:
360 DLLLOCAL SocketAcceptSslPollState(ExceptionSink* xsink, qore_socket_private* sock,
361 QoreSSLCertificate* cert = nullptr, QoreSSLPrivateKey* pkey = nullptr);
362
369 DLLLOCAL virtual int continuePoll(ExceptionSink* xsink);
370
371private:
372 qore_socket_private* sock;
373};
374
375class SocketSendPollState : public AbstractPollState {
376public:
377 DLLLOCAL SocketSendPollState(ExceptionSink* xsink, qore_socket_private* sock, const char* data, size_t size);
378
385 DLLLOCAL virtual int continuePoll(ExceptionSink* xsink);
386
388 DLLLOCAL size_t getBytesSent() const {
389 return sent;
390 }
391
392private:
393 qore_socket_private* sock;
394 const char* data;
395 size_t size;
396 size_t sent = 0;
397};
398
399class SocketRecvPollState : public AbstractPollState {
400public:
401 DLLLOCAL SocketRecvPollState(ExceptionSink* xsink, qore_socket_private* sock, size_t size);
402
409 DLLLOCAL virtual int continuePoll(ExceptionSink* xsink);
410
412 DLLLOCAL virtual QoreValue takeOutput() {
413 QoreValue rv = bin.release();
414 bin = nullptr;
415 return rv;
416 }
417
419 DLLLOCAL size_t getBytesReceived() const {
420 return received;
421 }
422
423private:
424 qore_socket_private* sock;
426 size_t size;
427 size_t received = 0;
428};
429
430class SocketRecvPacketPollState : public AbstractPollState {
431public:
432 DLLLOCAL SocketRecvPacketPollState(ExceptionSink* xsink, qore_socket_private* sock);
433
440 DLLLOCAL virtual int continuePoll(ExceptionSink* xsink);
441
443 DLLLOCAL virtual QoreValue takeOutput() {
444 QoreValue rv = bin.release();
445 bin = nullptr;
446 return rv;
447 }
448
450 DLLLOCAL size_t getBytesReceived() const {
451 return bin ? bin->size() : 0;
452 }
453
454private:
455 qore_socket_private* sock;
457 bool io = false;
458};
459
460class SocketRecvUntilBytesPollState : public AbstractPollState {
461public:
462 DLLLOCAL SocketRecvUntilBytesPollState(ExceptionSink* xsink, qore_socket_private* sock, const char* bytes,
463 size_t size);
464
471 DLLLOCAL virtual int continuePoll(ExceptionSink* xsink);
472
474 DLLLOCAL virtual QoreValue takeOutput() {
475 size_t len = bin->size();
476 BinaryNode* rv = new BinaryNode(bin->giveBuffer(), len);
477 bin = nullptr;
478 return rv;
479 }
480
482 DLLLOCAL size_t getBytesReceived() const {
483 return bin ? bin->size() : 0;
484 }
485
486private:
487 qore_socket_private* sock;
488 // we are using QoreStringNode as it has a much better append / concat implementation than BinaryNode
490 const char* bytes;
491 size_t size;
492 size_t matched = 0;
493
494 DLLLOCAL int doRecv(ExceptionSink* xsink);
495};
496
497
498hashdecl qore_socket_private {
499 friend class PrivateQoreSocketTimeoutHelper;
500 friend class PrivateQoreSocketThroughputHelper;
501 friend class SocketConnectInetPollState;
502
503 // for client certificate capture
504 static thread_local qore_socket_private* current_socket;
505
506 int sock, sfamily, port, stype, sprot;
507
508 // issue #3558: connection sequence to show when a connection has been reestablished
509 int64 connection_id = 0;
510
511 const QoreEncoding* enc;
512
513 std::string socketname;
514 // issue #3053: client target for SNI
515 std::string client_target;
516 SSLSocketHelper* ssl = nullptr;
517 Queue* event_queue = nullptr,
518 * warn_queue = nullptr;
519
520 // issue #3633: HTTP encoding to assume
521 std::string assume_http_encoding = "ISO-8859-1";
522
523 // socket buffer for buffered reads
524 char rbuf[DEFAULT_SOCKET_BUFSIZE];
525
526 // current buffer size
527 size_t buflen = 0,
528 bufoffset = 0;
529
530 int64 tl_warning_us = 0; // timeout threshold for network action warning in microseconds
531 double tp_warning_bs = 0; // throughput warning threshold in B/s
532 int64 tp_bytes_sent = 0, // throughput: bytes sent
533 tp_bytes_recv = 0, // throughput: bytes received
534 tp_us_sent = 0, // throughput: time sending
535 tp_us_recv = 0, // throughput: time receiving
536 tp_us_min = 0 // throughput: minimum time for transfer to be considered
537 ;
538
540 QoreValue warn_callback_arg;
542 QoreValue event_arg;
543 bool del = false,
544 http_exp_chunked_body = false,
545 ssl_accept_all_certs = false,
546 ssl_capture_remote_cert = false,
547 event_data = false,
548 sse_got_cr = false;
549 int in_op = -1,
550 ssl_verify_mode = SSL_VERIFY_NONE;
551
552 // issue #3512: the remote certificate captured
553 QoreObject* remote_cert = nullptr;
554
555 // issue #3818: verbose certificate verification error info
556 QoreStringNode* ssl_err_str = nullptr;
557
558 DLLLOCAL qore_socket_private(int n_sock = QORE_INVALID_SOCKET, int n_sfamily = AF_UNSPEC,
559 int n_stype = SOCK_STREAM, int n_prot = 0, const QoreEncoding* n_enc = QCS_DEFAULT) :
560 sock(n_sock), sfamily(n_sfamily), port(-1), stype(n_stype), sprot(n_prot), enc(n_enc) {
561 }
562
563 DLLLOCAL ~qore_socket_private() {
564 close_internal();
565
566 // must be dereferenced and removed before deleting
567 assert(!event_queue);
568 assert(!warn_queue);
569 }
570
571 DLLLOCAL bool isOpen() {
572 return sock != QORE_INVALID_SOCKET;
573 }
574
575 DLLLOCAL int close() {
576 return close_internal();
577 }
578
579 DLLLOCAL int close_and_reset() {
580 assert(sock != QORE_INVALID_SOCKET);
581 int rc;
582 while (true) {
583#ifdef _Q_WINDOWS
584 rc = ::closesocket(sock);
585#else
586 rc = ::close(sock);
587#endif
588 // try again if close was interrupted by a signal
589 if (!rc || sock_get_error() != EINTR)
590 break;
591 }
592 //printd(5, "qore_socket_private::close_and_reset(this: %p) close(%d) returned %d\n", this, sock, rc);
593 sock = QORE_INVALID_SOCKET;
594 if (buflen) {
595 buflen = 0;
596 }
597 if (bufoffset) {
598 bufoffset = 0;
599 }
600 if (del) {
601 del = false;
602 }
603 if (port != -1) {
604 port = -1;
605 }
606 if (in_op >= 0) {
607 in_op = -1;
608 }
609 if (http_exp_chunked_body) {
610 http_exp_chunked_body = false;
611 }
612 if (sse_got_cr) {
613 sse_got_cr = false;
614 }
615 sfamily = AF_UNSPEC;
616 stype = SOCK_STREAM;
617 sprot = 0;
618 // issue #3053: clear hostname for SNI
619 client_target.clear();
620 return rc;
621 }
622
623 DLLLOCAL int close_internal() {
624 //printd(5, "qore_socket_private::close_internal(this: %p) sock: %d\n", this, sock);
625 if (ssl_err_str) {
626 ssl_err_str->deref();
627 ssl_err_str = nullptr;
628 }
629 if (remote_cert) {
630 remote_cert->deref(nullptr);
631 remote_cert = nullptr;
632 }
633 if (sock >= 0) {
634 // if an SSL connection has been established, shut it down first
635 if (ssl) {
636 ssl->shutdown();
637 ssl->deref();
638 ssl = nullptr;
639 }
640
641 if (!socketname.empty()) {
642 if (del)
643 unlink(socketname.c_str());
644 socketname.clear();
645 }
646 do_close_event();
647 // issue #3558: increment the connection sequence here. so the connection sequence is different as soon as
648 // it's closed
649 ++connection_id;
650
651 return close_and_reset();
652 } else {
653 return 0;
654 }
655 }
656
657 DLLLOCAL void setAssumedEncoding(const char* str) {
658 assume_http_encoding = str;
659 }
660
661 DLLLOCAL const char* getAssumedEncoding() const {
662 return assume_http_encoding.c_str();
663 }
664
665 DLLLOCAL int getSendTimeout() const {
666 hashdecl timeval tv;
667
668#if defined(HPUX) && defined(__ia64) && defined(__LP64__)
669 // on HPUX 64-bit the OS defines socklen_t to be 8 bytes
670 // but the library expects a 32-bit value
671 int size = sizeof(hashdecl timeval);
672#else
673 socklen_t size = sizeof(hashdecl timeval);
674#endif
675
676 if (getsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (GETSOCKOPT_ARG_4)&tv, (socklen_t *)&size))
677 return -1;
678
679 return tv.tv_sec * 1000 + tv.tv_usec / 1000;
680 }
681
682 DLLLOCAL int getRecvTimeout() const {
683 hashdecl timeval tv;
684
685#if defined(HPUX) && defined(__ia64) && defined(__LP64__)
686 // on HPUX 64-bit the OS defines socklen_t to be 8 bytes
687 // but the library expects a 32-bit value
688 int size = sizeof(hashdecl timeval);
689#else
690 socklen_t size = sizeof(hashdecl timeval);
691#endif
692
693 if (getsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (GETSOCKOPT_ARG_4)&tv, (socklen_t *)&size))
694 return -1;
695
696 return tv.tv_sec * 1000 + tv.tv_usec / 1000;
697 }
698
699 DLLLOCAL int getPort() {
700 // if we don't need to find out what port we are, then return current value
701 if (sock == QORE_INVALID_SOCKET || (sfamily != AF_INET && sfamily != AF_INET6) || port > 0)
702 return port;
703
704 // otherwise find out what port we're connected to
705 hashdecl sockaddr_storage addr;
706#if defined(HPUX) && defined(__ia64) && defined(__LP64__)
707 // on HPUX 64-bit the OS defines socklen_t to be 8 bytes, but the library expects a 32-bit value
708 int size = sizeof addr;
709#else
710 socklen_t size = sizeof addr;
711#endif
712
713 if (getsockname(sock, (struct sockaddr *)&addr, (socklen_t *)&size) < 0)
714 return -1;
715
716 port = q_get_port_from_addr((const struct sockaddr *)&addr);
717 return port;
718 }
719
720 DLLLOCAL static void do_header(const char* key, QoreString& hdr, const QoreValue& v) {
721 switch (v.getType()) {
722 case NT_STRING:
723 hdr.sprintf("%s: %s\r\n", key, v.get<const QoreStringNode>()->c_str());
724 break;
725 case NT_INT:
726 hdr.sprintf("%s: " QLLD "\r\n", key, v.getAsBigInt());
727 break;
728 case NT_FLOAT: {
729 hdr.sprintf("%s: ", key);
730 size_t offset = hdr.size();
731 hdr.sprintf("%f\r\n", v.getAsFloat());
732 // issue 1556: external modules that call setlocale() can change
733 // the decimal point character used here from '.' to ','
734 // only search the double added, QoreString::sprintf() concatenates
735 q_fix_decimal(&hdr, offset);
736 break;
737 }
738 case NT_NUMBER:
739 hdr.sprintf("%s: ", key);
740 v.get<const QoreNumberNode>()->toString(hdr);
741 hdr.concat("\r\n");
742 break;
743 case NT_BOOLEAN:
744 hdr.sprintf("%s: %d\r\n", key, (int)v.getAsBool());
745 break;
746 }
747 }
748
749 // issue #3879: must add Content-Length if not present, even if there is no message body
752 DLLLOCAL static void do_headers(QoreString& hdr, const QoreHashNode* headers, size_t size, bool addsize = true) {
753 // RFC-2616 4.4 (http://tools.ietf.org/html/rfc2616#section-4.4)
754 // add Content-Length: 0 to headers for responses without a body where there is no transfer-encoding
755 if (headers) {
756 ConstHashIterator hi(headers);
757
758 while (hi.next()) {
759 const QoreValue v = hi.get();
760 const char* key = hi.getKey();
761 if (!size && addsize) {
762 if (!strcasecmp(key, "transfer-encoding")) {
763 addsize = false;
764 } else if (!strcasecmp(key, "content-type")
765 && (v.getType() == NT_STRING)
766 && (*v.get<const QoreStringNode>() == "text/event-stream")) {
767 addsize = false;
768 }
769 }
770 if ((addsize || size) && !strcasecmp(key, "content-length")) {
771 // ignore Content-Length given manually
772 continue;
773 }
774 if (v.getType() == NT_LIST) {
775 ConstListIterator li(v.get<const QoreListNode>());
776 while (li.next())
777 do_header(key, hdr, li.getValue());
778 } else
779 do_header(key, hdr, v);
780 }
781 }
782 // add data and content-length header if necessary
783 if (size || addsize) {
784 hdr.sprintf("Content-Length: %zu\r\n", size);
785 //printd(5, "qore_socket_private::do_headers() added Content-Length: %zu\n", size);
786 }
787
788 hdr.concat("\r\n");
789 }
790
791 DLLLOCAL int listen(int backlog = 20) {
792 if (sock == QORE_INVALID_SOCKET)
793 return QSE_NOT_OPEN;
794 if (in_op >= 0)
795 return QSE_IN_OP;
796#ifdef _Q_WINDOWS
797 if (::listen(sock, backlog)) {
798 // set errno
799 sock_get_error();
800 return -1;
801 }
802 return 0;
803#else
804 return ::listen(sock, backlog);
805#endif
806 }
807
808 DLLLOCAL int accept_intern(ExceptionSink* xsink, struct sockaddr *addr, socklen_t *size, int timeout_ms = -1) {
809 //printd(5, "qore_socket_private::accept_intern() to: %d\n", timeout_ms);
810 assert(xsink);
811 while (true) {
812 if (timeout_ms >= 0 && !isDataAvailable(timeout_ms, "accept", xsink)) {
813 if (*xsink)
814 return -1;
815 // do not throw exception here, NOTHING will be returned in Qore on timeout
816 return QSE_TIMEOUT; // -3
817 }
818
819 int rc = ::accept(sock, addr, size);
820 if (rc != QORE_INVALID_SOCKET)
821 return rc;
822
823 // retry if interrupted by a signal
824 if (sock_get_error() == EINTR)
825 continue;
826
827 qore_socket_error(xsink, "SOCKET-ACCEPT-ERROR", "error in accept()", 0, 0, 0, addr);
828 return -1;
829 }
830 }
831
832 // returns a new socket
833 DLLLOCAL int accept_internal(ExceptionSink* xsink, SocketSource* source, int timeout_ms = -1) {
834 assert(xsink);
835 if (sock == QORE_INVALID_SOCKET) {
836 xsink->raiseException("SOCKET-NOT-OPEN", "socket must be opened, bound, and in a listening state before "
837 "new connections can be accepted");
838 return QSE_NOT_OPEN;
839 }
840 if (in_op >= 0) {
841 if (in_op == q_gettid()) {
842 se_in_op("Socket", "accept", xsink);
843 return QSE_IN_OP;
844 }
845 se_in_op_thread("Socket", "accept", xsink);
846 return QSE_IN_OP_THREAD;
847 }
848
849 int rc;
850 if (sfamily == AF_UNIX) {
851#ifdef _Q_WINDOWS
852 xsink->raiseException("SOCKET-ACCEPT-ERROR", "UNIX sockets are not available under Windows");
853 return -1;
854#else
855 hashdecl sockaddr_un addr_un;
856
857#if defined(HPUX) && defined(__ia64) && defined(__LP64__)
858 // on HPUX 64-bit the OS defines socklen_t to be 8 bytes
859 // but the library expects a 32-bit value
860 int size = sizeof(hashdecl sockaddr_un);
861#else
862 socklen_t size = sizeof(hashdecl sockaddr_un);
863#endif
864 rc = accept_intern(xsink, (struct sockaddr *)&addr_un, (socklen_t *)&size, timeout_ms);
865 //printd(1, "qore_socket_private::accept_internal() " QSD " bytes returned\n", size);
866
867 if (rc >= 0 && source) {
868 QoreStringNode* addr = new QoreStringNode(enc);
869 addr->sprintf("UNIX socket: %s", socketname.c_str());
870 source->priv->setAddress(addr);
871 source->priv->setHostName("localhost");
872 }
873#endif // windows
874 } else if (sfamily == AF_INET || sfamily == AF_INET6) {
875 hashdecl sockaddr_storage addr_in;
876#if defined(HPUX) && defined(__ia64) && defined(__LP64__)
877 // on HPUX 64-bit the OS defines socklen_t to be 8 bytes
878 // but the library expects a 32-bit value
879 int size = sizeof(addr_in);
880#else
881 socklen_t size = sizeof(addr_in);
882#endif
883
884 rc = accept_intern(xsink, (struct sockaddr *)&addr_in, (socklen_t *)&size, timeout_ms);
885 //printd(1, "qore_socket_private::accept_internal() rc: %d, %d bytes returned\n", rc, size);
886
887 if (rc >= 0 && source) {
888 char host[NI_MAXHOST + 1];
889 char service[NI_MAXSERV + 1];
890
891 if (!getnameinfo((struct sockaddr *)&addr_in, qore_get_in_len((struct sockaddr *)&addr_in), host,
892 sizeof(host), service, sizeof(service), NI_NUMERICSERV)) {
893 source->priv->setHostName(host);
894 }
895
896 // get ipv4 or ipv6 address
897 char ifname[INET6_ADDRSTRLEN];
898 if (inet_ntop(addr_in.ss_family, qore_get_in_addr((struct sockaddr *)&addr_in), ifname,
899 sizeof(ifname))) {
900 //printd(5, "inet_ntop() '%s' host: '%s'\n", ifname, host);
901 source->priv->setAddress(ifname);
902 }
903 }
904 } else {
905 // should not happen
906 xsink->raiseException("SOCKET-ACCEPT-ERROR", "do not know how to accept connections with address "
907 "family %d", sfamily);
908 rc = -1;
909 }
910 return rc;
911 }
912
913 DLLLOCAL QoreHashNode* getEvent(int event, int source = QORE_SOURCE_SOCKET) const {
914 QoreHashNode* h = new QoreHashNode(autoTypeInfo);
915 if (event_arg) {
916 h->setKeyValue("arg", event_arg.refSelf(), nullptr);
917 }
918
919 h->setKeyValue("event", event, nullptr);
920 h->setKeyValue("source", source, nullptr);
921 h->setKeyValue("id", (int64)this, nullptr);
922
923 return h;
924 }
925
926 DLLLOCAL void cleanup(ExceptionSink* xsink) {
927 if (event_queue) {
928 // close the socket before the delete message is put on the queue
929 // the socket would be closed anyway in the destructor
930 close_internal();
931
932 event_queue->pushAndTakeRef(getEvent(QORE_EVENT_DELETED));
933
934 // deref and remove event queue
935 event_queue->deref(xsink);
936 event_queue = nullptr;
937 }
938 if (warn_queue) {
939 warn_queue->deref(xsink);
940 warn_queue = nullptr;
941 if (warn_callback_arg) {
942 warn_callback_arg.discard(xsink);
943 warn_callback_arg.clear();
944 }
945 }
946 }
947
948 DLLLOCAL void setEventQueue(ExceptionSink* xsink, Queue* q, QoreValue arg, bool with_data) {
949 if (event_queue) {
950 if (event_arg) {
951 event_arg.discard(xsink);
952 }
953 event_queue->deref(xsink);
954 }
955 event_queue = q;
956 event_arg = arg;
957 event_data = with_data;
958 }
959
960 DLLLOCAL void do_start_ssl_event() {
961 if (event_queue) {
962 event_queue->pushAndTakeRef(getEvent(QORE_EVENT_START_SSL));
963 }
964 }
965
966 DLLLOCAL void do_ssl_established_event() {
967 if (event_queue) {
968 QoreHashNode* h = getEvent(QORE_EVENT_SSL_ESTABLISHED);
969 h->setKeyValue("cipher", new QoreStringNode(ssl->getCipherName()), nullptr);
970 h->setKeyValue("cipher_version", new QoreStringNode(ssl->getCipherVersion()), nullptr);
971 event_queue->pushAndTakeRef(h);
972 }
973 }
974
975 DLLLOCAL void do_connect_event(int af, const struct sockaddr* addr, const char* target,
976 const char* service = nullptr, int prt = -1) {
977 if (event_queue) {
978 QoreHashNode* h = getEvent(QORE_EVENT_CONNECTING);
979 QoreStringNode* str = q_addr_to_string2(addr);
980 if (str) {
981 h->setKeyValue("address", str, nullptr);
982 } else {
983 h->setKeyValue("error", q_strerror(sock_get_error()), nullptr);
984 }
985 q_af_to_hash(af, *h, nullptr);
986 h->setKeyValue("target", new QoreStringNode(target), nullptr);
987 if (service)
988 h->setKeyValue("service", new QoreStringNode(service), nullptr);
989 if (prt != -1)
990 h->setKeyValue("port", prt, nullptr);
991 event_queue->pushAndTakeRef(h);
992 }
993 }
994
995 DLLLOCAL void do_connected_event() {
996 if (event_queue) {
997 event_queue->pushAndTakeRef(getEvent(QORE_EVENT_CONNECTED));
998 }
999 }
1000
1001 DLLLOCAL void do_data_event_intern(int event, int source, const QoreStringNode& str) const {
1002 assert(event_queue && event_data && str.size());
1003 ReferenceHolder<QoreHashNode> h(getEvent(event, source), nullptr);
1004 h->setKeyValue("data", str.refSelf(), nullptr);
1005 event_queue->pushAndTakeRef(h.release());
1006 }
1007
1008 DLLLOCAL void do_data_event(int event, int source, const QoreStringNode& str) const {
1009 if (event_queue && event_data && str.size()) {
1010 do_data_event_intern(event, source, str);
1011 }
1012 }
1013
1014 DLLLOCAL void do_data_event(int event, int source, const BinaryNode& b) const {
1015 if (event_queue && event_data && b.size()) {
1016 ReferenceHolder<QoreHashNode> h(getEvent(event, source), nullptr);
1017 h->setKeyValue("data", b.refSelf(), nullptr);
1018 event_queue->pushAndTakeRef(h.release());
1019 }
1020 }
1021
1022 DLLLOCAL void do_data_event(int event, int source, const void* data, size_t size) const {
1023 if (event_queue && event_data && size) {
1024 ReferenceHolder<QoreHashNode> h(getEvent(event, source), nullptr);
1026 b->append(data, size);
1027 h->setKeyValue("data", b.release(), nullptr);
1028 event_queue->pushAndTakeRef(h.release());
1029 }
1030 }
1031
1032 DLLLOCAL void do_header_event(int event, int source, const QoreHashNode& hdr) const {
1033 if (event_queue && event_data && !hdr.empty()) {
1034 ReferenceHolder<QoreHashNode> h(getEvent(event, source), nullptr);
1035 h->setKeyValue("headers", hdr.refSelf(), nullptr);
1036 event_queue->pushAndTakeRef(h.release());
1037 }
1038 }
1039
1040 DLLLOCAL void do_chunked_read(int event, size_t bytes, size_t total_read, int source) {
1041 if (event_queue) {
1042 QoreHashNode* h = getEvent(event, source);
1043 if (event == QORE_EVENT_HTTP_CHUNKED_DATA_RECEIVED)
1044 h->setKeyValue("read", bytes, nullptr);
1045 else
1046 h->setKeyValue("size", bytes, nullptr);
1047 h->setKeyValue("total_read", total_read, nullptr);
1048 event_queue->pushAndTakeRef(h);
1049 }
1050 }
1051
1052 DLLLOCAL void do_read_http_header(int event, const QoreHashNode* headers, int source) {
1053 if (event_queue) {
1054 QoreHashNode* h = getEvent(event, source);
1055 h->setKeyValue("headers", headers->hashRefSelf(), nullptr);
1056 event_queue->pushAndTakeRef(h);
1057 }
1058 }
1059
1060 DLLLOCAL void do_send_http_message_event(const QoreString& str, const QoreHashNode* headers, int source) {
1061 if (event_queue) {
1062 QoreHashNode* h = getEvent(QORE_EVENT_HTTP_SEND_MESSAGE, source);
1063 h->setKeyValue("message", new QoreStringNode(str), nullptr);
1064 //printd(5, "do_send_http_message_event() str='%s' headers: %p (%d %s)\n", str.c_str(), headers, headers->getType(), headers->getTypeName());
1065 h->setKeyValue("headers", headers->copy(), nullptr);
1066 event_queue->pushAndTakeRef(h);
1067 }
1068 }
1069
1070 DLLLOCAL void do_close_event() {
1071 if (event_queue) {
1072 event_queue->pushAndTakeRef(getEvent(QORE_EVENT_CHANNEL_CLOSED));
1073 }
1074 }
1075
1076 DLLLOCAL void do_read_event(size_t bytes_read, size_t total_read, size_t bufsize = 0, int source = QORE_SOURCE_SOCKET) {
1077 // post bytes read on event queue, if any
1078 if (event_queue) {
1079 QoreHashNode* h = getEvent(QORE_EVENT_PACKET_READ, source);
1080 h->setKeyValue("read", bytes_read, nullptr);
1081 h->setKeyValue("total_read", total_read, nullptr);
1082 // set total bytes to read and remaining bytes if bufsize > 0
1083 if (bufsize > 0)
1084 h->setKeyValue("total_to_read", bufsize, nullptr);
1085 event_queue->pushAndTakeRef(h);
1086 }
1087 }
1088
1089 DLLLOCAL void do_send_event(int bytes_sent, int total_sent, int bufsize) {
1090 // post bytes sent on event queue, if any
1091 if (event_queue) {
1092 QoreHashNode* h = getEvent(QORE_EVENT_PACKET_SENT);
1093 h->setKeyValue("sent", bytes_sent, nullptr);
1094 h->setKeyValue("total_sent", total_sent, nullptr);
1095 h->setKeyValue("total_to_send", bufsize, nullptr);
1096 event_queue->pushAndTakeRef(h);
1097 }
1098 }
1099
1100 DLLLOCAL void do_resolve_event(const char* host, const char* service = 0) {
1101 // post bytes sent on event queue, if any
1102 if (event_queue) {
1103 QoreHashNode* h = getEvent(QORE_EVENT_HOSTNAME_LOOKUP);
1104 if (host)
1105 h->setKeyValue("name", new QoreStringNode(host), nullptr);
1106 if (service)
1107 h->setKeyValue("service", new QoreStringNode(service), nullptr);
1108 event_queue->pushAndTakeRef(h);
1109 }
1110 }
1111
1112 DLLLOCAL void do_resolved_event(const struct sockaddr* addr) {
1113 // post bytes sent on event queue, if any
1114 if (event_queue) {
1115 QoreHashNode* h = getEvent(QORE_EVENT_HOSTNAME_RESOLVED);
1116 QoreStringNode* str = q_addr_to_string2(addr);
1117 if (str)
1118 h->setKeyValue("address", str, nullptr);
1119 else
1120 h->setKeyValue("error", q_strerror(sock_get_error()), nullptr);
1121 int prt = q_get_port_from_addr(addr);
1122 if (prt > 0)
1123 h->setKeyValue("port", prt, nullptr);
1124 q_af_to_hash(addr->sa_family, *h, nullptr);
1125 event_queue->pushAndTakeRef(h);
1126 }
1127 }
1128
1129 DLLLOCAL int64 getObjectIDForEvents() const {
1130 return (int64)this;
1131 }
1132
1133 DLLLOCAL int connectUNIX(const char* p, int sock_type, int protocol, ExceptionSink* xsink) {
1134 assert(xsink);
1135 assert(p);
1136 QORE_TRACE("connectUNIX()");
1137
1138#ifdef _Q_WINDOWS
1139 xsink->raiseException("SOCKET-CONNECTUNIX-ERROR", "UNIX sockets are not available under Windows");
1140 return -1;
1141#else
1142 // close socket if already open
1143 close();
1144
1145 printd(5, "qore_socket_private::connectUNIX(%s)\n", p);
1146
1147 hashdecl sockaddr_un addr;
1148
1149 addr.sun_family = AF_UNIX;
1150 // copy path and terminate if necessary
1151 strncpy(addr.sun_path, p, sizeof(addr.sun_path) - 1);
1152 addr.sun_path[sizeof(addr.sun_path) - 1] = '\0';
1153 if ((sock = socket(AF_UNIX, sock_type, protocol)) == QORE_SOCKET_ERROR) {
1154 xsink->raiseErrnoException("SOCKET-CONNECT-ERROR", errno, "error connecting to UNIX socket: '%s'", p);
1155 return -1;
1156 }
1157
1158 do_connect_event(AF_UNIX, (sockaddr*)&addr, p);
1159 while (true) {
1160 if (!::connect(sock, (const sockaddr *)&addr, sizeof(struct sockaddr_un)))
1161 break;
1162
1163 // try again if we were interrupted by a signal
1164 if (sock_get_error() == EINTR)
1165 continue;
1166
1167 // otherwise close the socket and return an exception with the error code
1168 // do not have to worry about windows API calls here; this is a UNIX-only function
1169 close_and_reset();
1170 qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in connect()", 0, p);
1171
1172 return -1;
1173 }
1174
1175 // save file name for deleting when socket is closed
1176 socketname = addr.sun_path;
1177 sfamily = AF_UNIX;
1178
1179 do_connected_event();
1180
1181 return 0;
1182#endif // windows
1183 }
1184
1185 // socket must be open or -1 is returned and a Qore-language exception is raised
1186 /* return values:
1187 -1: error
1188 0: timeout
1189 > 0: I/O can continue
1190 */
1191 DLLLOCAL int asyncIoWait(int timeout_ms, bool read, bool write, const char* cname, const char* mname,
1192 ExceptionSink* xsink) const {
1193 assert(xsink);
1194 assert(read || write);
1195 if (sock == QORE_INVALID_SOCKET) {
1196 se_not_open(cname, mname, xsink, "asyncIoWait");
1197 return -1;
1198 }
1199
1200 return asyncIoWait(timeout_ms, read, write, xsink);
1201 }
1202
1203 DLLLOCAL int asyncIoWait(int timeout_ms, bool read, bool write, ExceptionSink* xsink) const {
1204 assert(xsink);
1205#if defined HAVE_POLL
1206 return poll_intern(xsink, timeout_ms, read, write);
1207#elif defined HAVE_SELECT
1208 return select_intern(xsink, timeout_ms, read, write);
1209#else
1210#error no async socket operations supported
1211#endif
1212 }
1213
1214#if defined HAVE_POLL
1215 DLLLOCAL int poll_intern(ExceptionSink* xsink, int timeout_ms, bool read, bool write) const {
1216 int rc;
1217 short arg = 0;
1218 if (read)
1219 arg |= POLLIN;
1220 if (write)
1221 arg |= POLLOUT;
1222 pollfd fds = {sock, arg, 0};
1223 while (true) {
1224 rc = ::poll(&fds, 1, timeout_ms);
1225 if (rc == -1 && errno == EINTR)
1226 continue;
1227 break;
1228 }
1229 if (rc < 0)
1230 qore_socket_error(xsink, "SOCKET-SELECT-ERROR", "poll(2) returned an error");
1231 else if (!rc && ((fds.revents & POLLHUP) || (fds.revents & (POLLERR|POLLNVAL))))
1232 rc = -1;
1233
1234 return rc;
1235 }
1236#elif defined HAVE_SELECT
1237 DLLLOCAL int select_intern(ExceptionSink* xsink, int timeout_ms, bool read, bool write) const {
1238 bool aborted = false;
1239 int rc = select_intern(xsink, timeout_ms, read, write, aborted);
1240 if (rc != QORE_SOCKET_ERROR && aborted)
1241 rc = -1;
1242 return rc;
1243 }
1244
1245 DLLLOCAL int select_intern(ExceptionSink* xsink, int timeout_ms, bool read, bool write, bool& aborted) const {
1246 assert(xsink);
1247 assert(!aborted);
1248 // windows does not use FD_SETSIZE to limit the value of the highest socket descriptor in the set
1249 // instead it has a maximum of 64 sockets in the set; we only need one anyway
1250#ifndef _Q_WINDOWS
1251 // select is inherently broken since it can only handle descriptors < FD_SETSIZE, which is 1024 on Linux for example
1252 if (sock >= FD_SETSIZE) {
1253 xsink->raiseException("SOCKET-SELECT-ERROR", "fd is %d which is >= %d; contact the Qore developers to implement an alternative to select() on this platform", sock, FD_SETSIZE);
1254 return -1;
1255 }
1256#endif
1257 hashdecl timeval tv;
1258 int rc;
1259 while (true) {
1260 // to be safe, we set the file descriptor arg after each EINTR (required on Linux for example)
1261 fd_set sfs, err;
1262
1263 FD_ZERO(&sfs);
1264 FD_ZERO(&err);
1265 FD_SET(sock, &sfs);
1266 FD_SET(sock, &err);
1267
1268 tv.tv_sec = timeout_ms / 1000;
1269 tv.tv_usec = (timeout_ms % 1000) * 1000;
1270
1271 fd_set* readfd = read ? &sfs : 0;
1272 fd_set* writefd = write ? &sfs : 0;
1273
1274 rc = select(sock + 1, readfd, writefd, &err, &tv);
1275 //printd(5, "select_intern() rc: %d err: %d\n", rc, FD_ISSET(sock, &err));
1276 if (rc != QORE_SOCKET_ERROR) {
1277 if (FD_ISSET(sock, &err))
1278 aborted = true;
1279 break;
1280 }
1281 if (sock_get_error() != EINTR)
1282 break;
1283 }
1284 if (rc == QORE_SOCKET_ERROR) {
1285 // do not close the socket here, even in case of EBADF, just return an error
1286 rc = 0;
1287 qore_socket_error(xsink, "SOCKET-SELECT-ERROR", "select(2) returned an error");
1288 }
1289
1290 return rc;
1291 }
1292#endif
1293
1294 DLLLOCAL bool tryReadSocketData(const char* mname, ExceptionSink* xsink) {
1295 assert(xsink);
1296 assert(!buflen);
1297 if (!ssl) {
1298 // issue #3564: see if any data is available on the socket
1299 return asyncIoWait(0, true, false, "Socket", mname, xsink);
1300 }
1301 // select can return true if there is protocol negotiation data available,
1302 // so we try to peek 1 byte of application data with a timeout of 0 with the SSL connection
1303 int rc = ssl->doSSLRW(xsink, mname, rbuf, 1, 0, PEEK, false);
1304 if (*xsink || (rc == QSE_TIMEOUT)) {
1305 return false;
1306 }
1307 return rc > 0 ? true : false;
1308 }
1309
1310 DLLLOCAL bool isSocketDataAvailable(int timeout_ms, const char* mname, ExceptionSink* xsink) {
1311 return asyncIoWait(timeout_ms, true, false, "Socket", mname, xsink);
1312 }
1313
1314 DLLLOCAL bool isDataAvailable(int timeout_ms, const char* mname, ExceptionSink* xsink) {
1315 if (buflen)
1316 return true;
1317 return isSocketDataAvailable(timeout_ms, mname, xsink);
1318 }
1319
1320 DLLLOCAL bool isWriteFinished(int timeout_ms, const char* mname, ExceptionSink* xsink) {
1321 return asyncIoWait(timeout_ms, false, true, "Socket", mname, xsink);
1322 }
1323
1324 DLLLOCAL int close_and_exit() {
1325 if (sock != QORE_INVALID_SOCKET)
1326 close_and_reset();
1327 return -1;
1328 }
1329
1330 DLLLOCAL int connectINETTimeout(int timeout_ms, const struct sockaddr* ai_addr, size_t ai_addrlen,
1331 ExceptionSink* xsink, bool only_timeout) {
1332 assert(xsink);
1333 PrivateQoreSocketTimeoutHelper toh(this, "connect");
1334
1335 while (true) {
1336 if (!::connect(sock, ai_addr, ai_addrlen))
1337 return 0;
1338
1339#ifdef _Q_WINDOWS
1340 if (sock_get_error() != EAGAIN) {
1341 qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in connect()", 0, 0, 0, ai_addr);
1342 break;
1343 }
1344#else
1345 // try again if we were interrupted by a signal
1346 if (errno == EINTR)
1347 continue;
1348
1349 if (errno != EINPROGRESS)
1350 break;
1351#endif
1352
1353 //printd(5, "qore_socket_private::connectINETTimeout() timeout_ms: %d errno: %d\n", timeout_ms, errno);
1354
1355 // check for timeout or connection with EINPROGRESS
1356 while (true) {
1357#ifdef _Q_WINDOWS
1358 bool aborted = false;
1359 int rc = select_intern(xsink, timeout_ms, false, true, aborted);
1360
1361 //printd(5, "qore_socket_private::connectINETTimeout() timeout_ms: %d rc: %d aborted: %d\n",
1362 // timeout_ms, rc, aborted);
1363
1364 // windows select() returns an error in the error socket set instead of an WSAECONNREFUSED error like
1365 // UNIX, so we simulate it here
1366 if (rc != QORE_SOCKET_ERROR && aborted) {
1367 qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in connect()", 0, 0, 0, ai_addr);
1368 return -1;
1369 }
1370#else
1371 int rc = asyncIoWait(timeout_ms, false, true, "Socket", "connectINETTimeout", xsink);
1372#endif
1373 if (*xsink)
1374 return -1;
1375
1376 //printd(5, "asyncIoWait(%d) returned %d\n", timeout_ms, rc);
1377 if (rc == QORE_SOCKET_ERROR && sock_get_error() != EINTR) {
1378 if (!only_timeout)
1379 qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in asyncIoWait() with "
1380 "Socket::connect() with timeout", 0, 0, 0, ai_addr);
1381 return -1;
1382 } else if (rc > 0) {
1383 return checkConnected(xsink, nullptr, ai_addr, only_timeout);
1384 } else {
1385 SimpleRefHolder<QoreStringNode> desc(new QoreStringNodeMaker("timeout in connection after %dms",
1386 timeout_ms));
1387 concat_target(*(*desc), ai_addr);
1388 xsink->raiseException("SOCKET-CONNECT-ERROR", desc.release());
1389 return -1;
1390 }
1391 }
1392 }
1393
1394 return -1;
1395 }
1396
1397 DLLLOCAL int checkConnected(ExceptionSink* xsink, const char* hostsvc, const struct sockaddr* ai_addr = nullptr,
1398 bool only_timeout = false) {
1399 assert(sock);
1400
1401 // socket selected for write
1402 socklen_t lon = sizeof(int);
1403 int val;
1404
1405 if (getsockopt(sock, SOL_SOCKET, SO_ERROR, (GETSOCKOPT_ARG_4)(&val), &lon) == QORE_SOCKET_ERROR) {
1406 if (!only_timeout) {
1407 qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in getsockopt()", nullptr, hostsvc, nullptr,
1408 ai_addr);
1409 }
1410 return -1;
1411 }
1412
1413 if (val) {
1414 if (only_timeout) {
1415 errno = val;
1416 return -1;
1417 }
1418 qore_socket_error_intern(val, xsink, "SOCKET-CONNECT-ERROR", "error in getsockopt()", nullptr, hostsvc,
1419 nullptr, ai_addr);
1420 return -1;
1421 }
1422
1423 // connected successfully within the timeout period
1424 return 0;
1425 }
1426
1427 DLLLOCAL void confirmConnected(const char* host) {
1428 do_connected_event();
1429
1430 // issue #3053: save hostname for SNI
1431 if (host) {
1432 client_target = host;
1433 }
1434 }
1435
1436 DLLLOCAL int sock_errno_err(const char* err, const char* desc, ExceptionSink* xsink) {
1437 //sock = QORE_INVALID_SOCKET;
1438 qore_socket_error(xsink, err, desc);
1439 return -1;
1440 }
1441
1442 DLLLOCAL int set_non_blocking(bool non_blocking, ExceptionSink* xsink) {
1443 assert(xsink);
1444 // ignore call when socket already closed
1445 if (sock == QORE_INVALID_SOCKET) {
1446 assert(*xsink);
1447 return -1;
1448 }
1449
1450#ifdef _Q_WINDOWS
1451 u_long mode = non_blocking ? 1 : 0;
1452 int rc = ioctlsocket(sock, FIONBIO, &mode);
1453 if (check_windows_rc(rc)) {
1454 return sock_errno_err("SOCKET-CONNECT-ERROR", "error in ioctlsocket(FIONBIO)", xsink);
1455 }
1456#else
1457 int arg;
1458
1459 // get socket descriptor status flags
1460 if ((arg = fcntl(sock, F_GETFL, 0)) < 0) {
1461 return sock_errno_err("SOCKET-CONNECT-ERROR", "error in fcntl() getting socket descriptor status "
1462 "flag", xsink);
1463 }
1464
1465 if (non_blocking) { // set non-blocking
1466 arg |= O_NONBLOCK;
1467 } else { // set blocking
1468 arg &= ~O_NONBLOCK;
1469 }
1470
1471 if (fcntl(sock, F_SETFL, arg) < 0) {
1472 return sock_errno_err("SOCKET-CONNECT-ERROR", "error in fcntl() setting socket descriptor status "
1473 "flag", xsink);
1474 }
1475#endif
1476 //printd(5, "qore_socket_private::set_non_blocking() set: %d\n", non_blocking);
1477
1478 return 0;
1479 }
1480
1481 DLLLOCAL int connectINET(const char* host, const char* service, int timeout_ms, ExceptionSink* xsink,
1482 int family = AF_UNSPEC, int type = SOCK_STREAM, int protocol = 0) {
1483 assert(xsink);
1484 family = q_get_af(family);
1485 type = q_get_sock_type(type);
1486
1487 QORE_TRACE("qore_socket_private::connectINET()");
1488
1489 // close socket if already open
1490 close();
1491
1492 printd(5, "qore_socket_private::connectINET(%s:%s, %dms)\n", host, service, timeout_ms);
1493
1494 do_resolve_event(host, service);
1495
1496 QoreAddrInfo ai;
1497 if (ai.getInfo(xsink, host, service, family, 0, type, protocol))
1498 return -1;
1499
1500 hashdecl addrinfo* aip = ai.getAddrInfo();
1501
1502 // emit all "resolved" events
1503 if (event_queue)
1504 for (struct addrinfo* p = aip; p; p = p->ai_next)
1505 do_resolved_event(p->ai_addr);
1506
1507 int prt = q_get_port_from_addr(aip->ai_addr);
1508
1509 for (struct addrinfo* p = aip; p; p = p->ai_next) {
1510 if (!connectINETIntern(host, service, p->ai_family, p->ai_addr, p->ai_addrlen, p->ai_socktype,
1511 p->ai_protocol, prt, timeout_ms, xsink, true)) {
1512 return 0;
1513 }
1514 if (*xsink) {
1515 break;
1516 }
1517 }
1518
1519 if (!*xsink) {
1520 qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in connect()", 0, host, service);
1521 }
1522 return -1;
1523 }
1524
1525 DLLLOCAL int connectINETIntern(const char* host, const char* service, int ai_family, struct sockaddr* ai_addr,
1526 size_t ai_addrlen, int ai_socktype, int ai_protocol, int prt, int timeout_ms, ExceptionSink* xsink,
1527 bool only_timeout = false) {
1528 assert(xsink);
1529 printd(5, "qore_socket_private::connectINETIntern() host: %s service: %s family: %d timeout_ms: %d\n", host,
1530 service, ai_family, timeout_ms);
1531 if ((sock = socket(ai_family, ai_socktype, ai_protocol)) == QORE_INVALID_SOCKET) {
1532 xsink->raiseErrnoException("SOCKET-CONNECT-ERROR", errno, "cannot establish a connection to %s:%s", host,
1533 service);
1534 return -1;
1535 }
1536
1537 //printd(5, "qore_socket_private::connectINETIntern(this: %p, host: '%s', port: %d, timeout_ms: %d) "
1538 // "sock: %d\n", this, host, port, timeout_ms, sock);
1539
1540 int rc;
1541
1542 // perform connect with timeout if a non-negative timeout was passed
1543 if (timeout_ms >= 0) {
1544 // set non-blocking
1545 if (set_non_blocking(true, xsink))
1546 return close_and_exit();
1547
1548 do_connect_event(ai_family, ai_addr, host, service, prt);
1549
1550 rc = connectINETTimeout(timeout_ms, ai_addr, ai_addrlen, xsink, only_timeout);
1551 //printd(5, "qore_socket_private::connectINETIntern() errno: %d rc: %d, xsink: %d\n", errno, rc, xsink && *xsink);
1552
1553 // set blocking
1554 if (set_non_blocking(false, xsink))
1555 return close_and_exit();
1556 } else {
1557 do_connect_event(ai_family, ai_addr, host, service, prt);
1558
1559 while (true) {
1560 rc = ::connect(sock, ai_addr, ai_addrlen);
1561
1562 // try again if rc == -1 and errno == EINTR
1563 if (!rc || sock_get_error() != EINTR)
1564 break;
1565 }
1566 }
1567
1568 if (rc < 0) {
1569 if (!only_timeout || errno == ETIMEDOUT)
1570 qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in connect()", 0, host, service);
1571
1572 return close_and_exit();
1573 }
1574
1575 sfamily = ai_family;
1576 stype = ai_socktype;
1577 sprot = ai_protocol;
1578 port = prt;
1579 //printd(5, "qore_socket_private::connectINETIntern(this: %p, host='%s', port: %d, timeout_ms: %d) success, "
1580 // "rc: %d, sock: %d\n", this, host, port, timeout_ms, rc, sock);
1581
1582 confirmConnected(host);
1583 return 0;
1584 }
1585
1586 DLLLOCAL int upgradeClientToSSLIntern(ExceptionSink* xsink, const char* mname, const char* sni_target_host,
1587 int timeout_ms, QoreSSLCertificate* cert = nullptr, QoreSSLPrivateKey* pkey = nullptr) {
1588 assert(!ssl);
1589 SSLSocketHelperHelper sshh(this, true);
1590
1591 int rc;
1592 do_start_ssl_event();
1593 // issue #3053: send target hostname to support SNI
1594 if (!sni_target_host && !client_target.empty()) {
1595 sni_target_host = client_target.c_str();
1596 }
1597 if ((rc = ssl->setClient(xsink, mname, sni_target_host, sock, cert, pkey))
1598 || ssl->connect(mname, timeout_ms,
1599 xsink)) {
1600 sshh.error();
1601 return rc ? rc : -1;
1602 }
1603 do_ssl_established_event();
1604
1605 return 0;
1606 }
1607
1608 DLLLOCAL int upgradeServerToSSLIntern(ExceptionSink* xsink, const char* mname, int timeout_ms,
1609 QoreSSLCertificate* cert = nullptr, QoreSSLPrivateKey* pkey = nullptr) {
1610 assert(!ssl);
1611 //printd(5, "qore_socket_private::upgradeServerToSSLIntern() this: %p mode: %d\n", this, ssl_verify_mode);
1612 SSLSocketHelperHelper sshh(this, true);
1613
1614 do_start_ssl_event();
1615 if (ssl->setServer(xsink, mname, sock, cert, pkey) || ssl->accept(mname, timeout_ms, xsink)) {
1616 sshh.error();
1617 return -1;
1618 }
1619 do_ssl_established_event();
1620
1621 return 0;
1622 }
1623
1624 // returns 0 = success, -1 = error
1625 DLLLOCAL int openUNIX(int sock_type = SOCK_STREAM, int protocol = 0) {
1626 if (sock != QORE_INVALID_SOCKET)
1627 close();
1628
1629 if ((sock = socket(AF_UNIX, sock_type, protocol)) == QORE_INVALID_SOCKET) {
1630 return -1;
1631 }
1632
1633 sfamily = AF_UNIX;
1634 stype = sock_type;
1635 sprot = protocol;
1636 port = -1;
1637 return 0;
1638 }
1639
1640 // returns 0 = success, -1 = error
1641 DLLLOCAL int openINET(int family = AF_INET, int sock_type = SOCK_STREAM, int protocol = 0) {
1642 if (sock != QORE_INVALID_SOCKET)
1643 close();
1644
1645 if ((sock = socket(family, sock_type, protocol)) == QORE_INVALID_SOCKET)
1646 return -1;
1647
1648 sfamily = family;
1649 stype = sock_type;
1650 sprot = protocol;
1651 port = -1;
1652 return 0;
1653 }
1654
1655 DLLLOCAL int reuse(int opt) {
1656 //printf("qore_socket_private::reuse(%s)\n", opt ? "true" : "false");
1657 return setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (SETSOCKOPT_ARG_4)&opt, sizeof(int));
1658 }
1659
1660 // the only place where xsink is optional
1661 DLLLOCAL int bindIntern(struct sockaddr* ai_addr, size_t ai_addrlen, int prt, bool reuseaddr, ExceptionSink* xsink = 0) {
1662 reuse(reuseaddr);
1663
1664 if ((::bind(sock, ai_addr, ai_addrlen)) == QORE_SOCKET_ERROR) {
1665 if (xsink)
1666 qore_socket_error(xsink, "SOCKET-BIND-ERROR", "error in bind()", 0, 0, 0, ai_addr);
1667 close();
1668 return -1;
1669 }
1670
1671 // set port number
1672 if (prt)
1673 port = prt;
1674 else {
1675 // get port number
1676#if defined(HPUX) && defined(__ia64) && defined(__LP64__)
1677 // on HPUX 64-bit the OS defines socklen_t to be 8 bytes, but the library expects a 32-bit value
1678 int len = ai_addrlen;
1679#else
1680 socklen_t len = ai_addrlen;
1681#endif
1682
1683 if (getsockname(sock, ai_addr, &len))
1684 port = -1;
1685 else
1686 port = q_get_port_from_addr(ai_addr);
1687 }
1688 return 0;
1689 }
1690
1691 // bind to UNIX domain socket file
1692 DLLLOCAL int bindUNIX(ExceptionSink* xsink, const char* name, int socktype = SOCK_STREAM, int protocol = 0) {
1693 assert(xsink);
1694#ifdef _Q_WINDOWS
1695 xsink->raiseException("SOCKET-BINDUNIX-ERROR", "UNIX sockets are not available under Windows");
1696 return -1;
1697#else
1698 close();
1699
1700 // try to open socket if necessary
1701 if (openUNIX(socktype, protocol)) {
1702 xsink->raiseErrnoException("SOCKET-BIND-ERROR", errno, "error opening UNIX socket ('%s') for bind", name);
1703 return -1;
1704 }
1705
1706 hashdecl sockaddr_un addr;
1707 addr.sun_family = AF_UNIX;
1708 // copy path and terminate if necessary
1709 strncpy(addr.sun_path, name, sizeof(addr.sun_path) - 1);
1710 addr.sun_path[sizeof(addr.sun_path) - 1] = '\0';
1711
1712 if (bindIntern((sockaddr*)&addr, sizeof(struct sockaddr_un), -1, false, xsink))
1713 return -1;
1714
1715 // save socket file name for deleting on close
1716 socketname = addr.sun_path;
1717 // delete UNIX domain socket on close
1718 del = true;
1719 return 0;
1720#endif // windows
1721 }
1722
1723 DLLLOCAL int bindINET(ExceptionSink* xsink, const char* name, const char* service, bool reuseaddr = true, int family = AF_UNSPEC, int socktype = SOCK_STREAM, int protocol = 0) {
1724 assert(xsink);
1725 family = q_get_af(family);
1726 socktype = q_get_sock_type(socktype);
1727
1728 close();
1729
1730 QoreAddrInfo ai;
1731 do_resolve_event(name, service);
1732 if (ai.getInfo(xsink, name, service, family, AI_PASSIVE, socktype, protocol))
1733 return -1;
1734
1735 hashdecl addrinfo* aip = ai.getAddrInfo();
1736 // first emit all "resolved" events
1737 if (event_queue)
1738 for (struct addrinfo* p = aip; p; p = p->ai_next)
1739 do_resolved_event(p->ai_addr);
1740
1741 // try to open socket if necessary
1742 if (openINET(aip->ai_family, aip->ai_socktype, protocol)) {
1743 qore_socket_error(xsink, "SOCKET-BINDINET-ERROR", "error opening socket for bind", 0, name, service);
1744 return -1;
1745 }
1746
1747 int prt = q_get_port_from_addr(aip->ai_addr);
1748
1749 int en = 0;
1750 // iterate through addresses and bind to the first interface possible
1751 for (struct addrinfo* p = aip; p; p = p->ai_next) {
1752 if (!bindIntern(p->ai_addr, p->ai_addrlen, prt, reuseaddr)) {
1753 //printd(5, "qore_socket_private::bindINET(family: %d) bound: name: %s service: %s f: %d st: %d p: %d\n", family, name ? name : "(null)", service ? service : "(null)", p->ai_family, p->ai_socktype, p->ai_protocol);
1754 return 0;
1755 }
1756
1757 en = sock_get_raw_error();
1758 //printd(5, "qore_socket_private::bindINET() failed to bind: name: %s service: %s f: %d st: %d p: %d, errno: %d (%s)\n", name ? name : "(null)", service ? service : "(null)", p->ai_family, p->ai_socktype, p->ai_protocol, en, strerror(en));
1759 }
1760
1761 // if no bind was possible, then raise an exception
1762 qore_socket_error_intern(en, xsink, "SOCKET-BIND-ERROR", "error binding on socket", 0, name, service);
1763 return -1;
1764 }
1765
1766 // only called from qore-bound code - always with xsink
1767 DLLLOCAL QoreHashNode* getPeerInfo(ExceptionSink* xsink, bool host_lookup = true) const {
1768 assert(xsink);
1769 if (sock == QORE_INVALID_SOCKET) {
1770 se_not_open("Socket", "getPeerInfo", xsink);
1771 return 0;
1772 }
1773
1774 hashdecl sockaddr_storage addr;
1775 socklen_t len = sizeof addr;
1776 if (getpeername(sock, (struct sockaddr*)&addr, &len)) {
1777 qore_socket_error(xsink, "SOCKET-GETPEERINFO-ERROR", "error in getpeername()");
1778 return 0;
1779 }
1780
1781 return getAddrInfo(addr, len, host_lookup);
1782 }
1783
1784 // only called from qore-bound code - always with xsink
1785 DLLLOCAL QoreHashNode* getSocketInfo(ExceptionSink* xsink, bool host_lookup = true) const {
1786 assert(xsink);
1787 if (sock == QORE_INVALID_SOCKET) {
1788 se_not_open("Socket", "getSocketInfo", xsink);
1789 return 0;
1790 }
1791
1792 hashdecl sockaddr_storage addr;
1793#if defined(HPUX) && defined(__ia64) && defined(__LP64__)
1794 // on HPUX 64-bit the OS defines socklen_t to be 8 bytes, but the library expects a 32-bit value
1795 int len = sizeof addr;
1796#else
1797 socklen_t len = sizeof addr;
1798#endif
1799
1800 if (getsockname(sock, (struct sockaddr*)&addr, &len)) {
1801 qore_socket_error(xsink, "SOCKET-GETSOCKETINFO-ERROR", "error in getsockname()");
1802 return 0;
1803 }
1804
1805 return getAddrInfo(addr, len, host_lookup);
1806 }
1807
1808 DLLLOCAL QoreHashNode* getAddrInfo(const struct sockaddr_storage& addr, socklen_t len, bool host_lookup = true) const {
1809 QoreHashNode* h = new QoreHashNode(autoTypeInfo);
1810
1811 if (addr.ss_family == AF_INET || addr.ss_family == AF_INET6) {
1812 if (host_lookup) {
1813 char host[NI_MAXHOST + 1];
1814
1815 if (!getnameinfo((struct sockaddr*)&addr, qore_get_in_len((struct sockaddr*)&addr), host, sizeof(host), 0, 0, 0)) {
1816 QoreStringNode* hoststr = new QoreStringNode(host);
1817 h->setKeyValue("hostname", hoststr, 0);
1818 h->setKeyValue("hostname_desc", QoreAddrInfo::getAddressDesc(addr.ss_family, hoststr->c_str()), 0);
1819 }
1820 }
1821
1822 // get ipv4 or ipv6 address
1823 char ifname[INET6_ADDRSTRLEN];
1824 if (inet_ntop(addr.ss_family, qore_get_in_addr((struct sockaddr*)&addr), ifname, sizeof(ifname))) {
1825 QoreStringNode* addrstr = new QoreStringNode(ifname);
1826 h->setKeyValue("address", addrstr, 0);
1827 h->setKeyValue("address_desc", QoreAddrInfo::getAddressDesc(addr.ss_family, addrstr->c_str()), 0);
1828 }
1829
1830 int tport;
1831 if (addr.ss_family == AF_INET) {
1832 hashdecl sockaddr_in* s = (hashdecl sockaddr_in*)&addr;
1833 tport = ntohs(s->sin_port);
1834 } else {
1835 hashdecl sockaddr_in6* s = (hashdecl sockaddr_in6*)&addr;
1836 tport = ntohs(s->sin6_port);
1837 }
1838
1839 h->setKeyValue("port", tport, 0);
1840 }
1841#ifndef _Q_WINDOWS
1842 else if (addr.ss_family == AF_UNIX) {
1843 assert(!socketname.empty());
1844 QoreStringNode* addrstr = new QoreStringNode(socketname);
1845 h->setKeyValue("address", addrstr, 0);
1846 h->setKeyValue("address_desc", QoreAddrInfo::getAddressDesc(addr.ss_family, addrstr->c_str()), 0);
1847 }
1848#endif
1849
1850 h->setKeyValue("family", addr.ss_family, 0);
1851 h->setKeyValue("familystr", new QoreStringNode(QoreAddrInfo::getFamilyName(addr.ss_family)), 0);
1852
1853 return h;
1854 }
1855
1856 // set backwards-compatible object members on accept
1857 // to be (hopefully) deleted in a future version of qore
1858 DLLLOCAL void setAccept(QoreObject* o) {
1859 hashdecl sockaddr_storage addr;
1860
1861 socklen_t len = sizeof addr;
1862 if (getpeername(sock, (struct sockaddr*)&addr, &len))
1863 return;
1864
1865 if (addr.ss_family == AF_INET || addr.ss_family == AF_INET6) {
1866 // get ipv4 or ipv6 address
1867 char ifname[INET6_ADDRSTRLEN];
1868 if (inet_ntop(addr.ss_family, qore_get_in_addr((struct sockaddr *)&addr), ifname, sizeof(ifname))) {
1869 //printd(5, "inet_ntop() '%s' host: '%s'\n", ifname, host);
1870 o->setValue("source", new QoreStringNode(ifname), 0);
1871 }
1872
1873 char host[NI_MAXHOST + 1];
1874 if (!getnameinfo((struct sockaddr *)&addr, qore_get_in_len((struct sockaddr *)&addr), host, sizeof(host),
1875 0, 0, 0)) {
1876 o->setValue("source_host", new QoreStringNode(host), 0);
1877 }
1878 }
1879#ifndef _Q_WINDOWS
1880 else if (addr.ss_family == AF_UNIX) {
1881 QoreStringNode* astr = new QoreStringNode(enc);
1882 hashdecl sockaddr_un *addr_un = (hashdecl sockaddr_un *)&addr;
1883 astr->sprintf("UNIX socket: %s", addr_un->sun_path);
1884 o->setValue("source", astr, 0);
1885 o->setValue("source_host", new QoreStringNode("localhost"), 0);
1886 }
1887#endif
1888 }
1889
1891
1896 DLLLOCAL int readByteFromBuffer(char& output) {
1897 // must be checked if open/connected before this function is called
1898 assert(sock != QORE_INVALID_SOCKET);
1899
1900 // always returned buffered data first
1901 if (!buflen) {
1902 return -1;
1903 }
1904
1905 output = *(rbuf + bufoffset);
1906 if (buflen == 1) {
1907 buflen = 0;
1908 bufoffset = 0;
1909 } else {
1910 --buflen;
1911 ++bufoffset;
1912 }
1913 return 0;
1914 }
1915
1916 // buffered reads for high performance
1917 DLLLOCAL ssize_t brecv(ExceptionSink* xsink, const char* meth, char*& buf, size_t bs, int flags,
1918 int timeout, bool do_event = true, bool suppress_exception = false) {
1919 assert(xsink);
1920 // must be checked if open/connected before this function is called
1921 assert(sock != QORE_INVALID_SOCKET);
1922 assert(meth);
1923
1924 // always returned buffered data first
1925 if (buflen) {
1926 buf = rbuf + bufoffset;
1927 if (buflen <= bs) {
1928 bs = buflen;
1929 buflen = 0;
1930 bufoffset = 0;
1931 } else {
1932 buflen -= bs;
1933 bufoffset += bs;
1934 }
1935 return (ssize_t)bs;
1936 }
1937
1938 // real socket reads are only done when the buffer is empty
1939
1940 //printd(5, "qore_socket_private::brecv(buf: %p, bs: %d, flags: %d, timeout: %d, do_event: %d) this: %p "
1941 // ssl: %d\n", buf, (int)bs, flags, timeout, (int)do_event, this, ssl);
1942
1943 ssize_t rc;
1944 if (!ssl) {
1945 if (timeout >= 0 && !isDataAvailable(timeout, meth, xsink)) {
1946 if (*xsink) {
1947 return -1;
1948 }
1949 if (!suppress_exception) {
1950 se_timeout("Socket", meth, timeout, xsink);
1951 }
1952 return QSE_TIMEOUT;
1953 }
1954
1955 while (true) {
1956 errno = 0;
1957 rc = ::recv(sock, rbuf, DEFAULT_SOCKET_BUFSIZE, flags);
1958 if (rc == QORE_SOCKET_ERROR) {
1959 sock_get_error();
1960 if (errno == EINTR) {
1961 continue;
1962 }
1963#ifdef ECONNRESET
1964 if (errno == ECONNRESET) {
1965 if (!suppress_exception) {
1966 se_closed("Socket", meth, xsink);
1967 }
1968 close();
1969 } else
1970#endif
1971 if (!suppress_exception) {
1972 qore_socket_error(xsink, "SOCKET-RECV-ERROR", "error in recv()", meth);
1973 }
1974 break;
1975 }
1976 //printd(5, "qore_socket_private::brecv(%d, %p, %ld, %d) rc: %ld errno: %d\n", sock, buf, bs, flags,
1977 // rc, errno);
1978 // try again if we were interrupted by a signal
1979 if (rc >= 0) {
1980 break;
1981 }
1982 }
1983 } else {
1984 rc = ssl->read(xsink, meth, rbuf, DEFAULT_SOCKET_BUFSIZE, timeout, suppress_exception);
1985 }
1986
1987 //printd(5, "qore_socket_private::brecv(%d, %p, %ld, %d) rc: %ld errno: %d\n", sock, buf, bs, flags, rc, errno);
1988 if (rc > 0) {
1989 buf = rbuf;
1990 assert(!buflen);
1991 assert(!bufoffset);
1992 if (rc > (ssize_t)bs) {
1993 buflen = rc - bs;
1994 bufoffset = bs;
1995 rc = bs;
1996 }
1997
1998 // register event
1999 if (do_event) {
2000 do_read_event(rc, rc);
2001 }
2002 } else {
2003 // only close the socket if no data was recevied and the error is not EAGAIN or EINPROGRESS
2004 if (!rc && isOpen() && errno != EAGAIN && errno != EINPROGRESS) {
2005 close();
2006 }
2007 }
2008
2009 return rc;
2010 }
2011
2013 DLLLOCAL QoreStringNode* readHTTPData(ExceptionSink* xsink, const char* meth, int timeout, ssize_t& rc,
2014 bool exit_early = false) {
2015 assert(xsink);
2016 assert(meth);
2017 if (sock == QORE_INVALID_SOCKET) {
2018 se_not_open("Socket", meth, xsink, "readHTTPData");
2019 rc = QSE_NOT_OPEN;
2020 return 0;
2021 }
2022
2023 PrivateQoreSocketThroughputHelper th(this, false);
2024
2025 // state:
2026 // 0 = '\r' received
2027 // 1 = '\r\n' received
2028 // 2 = '\r\n\r' received
2029 // 3 = '\n' received
2030 // read in HHTP header until \r\n\r\n or \n\n from socket
2031 int state = -1;
2033
2034 size_t count = 0;
2035
2036 while (true) {
2037 char* buf;
2038 rc = brecv(xsink, meth, buf, 1, 0, timeout, false);
2039 //printd(5, "qore_socket_private::readHTTPData() this: %p Socket::%s(): rc: %zd read char: %c (%03d) "
2040 // "(old state: %d)\n", this, meth, rc, rc > 0 && buf[0] > 31 ? buf[0] : '?', rc > 0 ? buf[0] : 0, state);
2041 if (rc <= 0) {
2042 //printd(5, "qore_socket_private::readHTTPData(timeout: %d) hdr='%s' (len: %d), rc=" QSD ", "
2043 // "errno: %d: '%s'\n", timeout, hdr->c_str(), hdr->strlen(), rc, errno, strerror(errno));
2044
2045 if (!*xsink) {
2046 if (!count) {
2047 //printd(5, "qore_socket_private::readHTTPData() this: %p rc: %d count: %d (%d) "
2048 // "timeout: %d\n", this, rc, count, hdr->size(), timeout);
2049 se_closed("Socket", meth, xsink);
2050 } else {
2051 xsink->raiseExceptionArg("SOCKET-HTTP-ERROR", hdr.release(), "socket closed on remote end "
2052 "while reading header data after reading " QSD " byte%s", count, count == 1 ? "" : "s");
2053 }
2054 }
2055 return 0;
2056 }
2057 char c = buf[0];
2058 if (++count == QORE_MAX_HEADER_SIZE) {
2059 xsink->raiseException("SOCKET-HTTP-ERROR", "header size cannot exceed " QSD " bytes", count);
2060 return 0;
2061 }
2062
2063 // check if we can progress to the next state
2064 if (c == '\n') {
2065 if (state == -1) {
2066 state = 3;
2067 continue;
2068 }
2069 if (!state) {
2070 if (exit_early && hdr->empty())
2071 return 0;
2072 state = 1;
2073 continue;
2074 }
2075 assert(state > 0);
2076 break;
2077 } else if (c == '\r') {
2078 if (state == -1) {
2079 state = 0;
2080 continue;
2081 }
2082 if (!state)
2083 break;
2084 if (state == 1) {
2085 state = 2;
2086 continue;
2087 }
2088 }
2089
2090 if (state != -1) {
2091 switch (state) {
2092 case 0: hdr->concat('\r'); break;
2093 case 1: hdr->concat("\r\n"); break;
2094 case 2: hdr->concat("\r\n\r"); break;
2095 case 3: hdr->concat('\n'); break;
2096 }
2097 state = -1;
2098 }
2099 hdr->concat(c);
2100 }
2101 hdr->concat('\n');
2102 //printd(5, "qore_socket_private::readHTTPData(timeout: %d) hdr='%s' (%d)\n", timeout, hdr->c_str(),
2103 // hdr->size());
2104 th.finalize(hdr->size());
2105 return hdr.release();
2106 }
2107
2108 DLLLOCAL QoreStringNode* recv(ExceptionSink* xsink, ssize_t bufsize, int timeout, ssize_t& rc,
2109 int source = QORE_SOURCE_SOCKET) {
2110 assert(xsink);
2111 if (sock == QORE_INVALID_SOCKET) {
2112 se_not_open("Socket", "recv", xsink, "recv");
2113 rc = QSE_NOT_OPEN;
2114 return 0;
2115 }
2116 if (in_op >= 0) {
2117 if (in_op == q_gettid()) {
2118 se_in_op("Socket", "recv", xsink);
2119 return 0;
2120 }
2121 se_in_op_thread("Socket", "recv", xsink);
2122 return 0;
2123 }
2124
2125 PrivateQoreSocketThroughputHelper th(this, false);
2126
2127 size_t bs = bufsize > 0 && bufsize < DEFAULT_SOCKET_BUFSIZE ? bufsize : DEFAULT_SOCKET_BUFSIZE;
2128
2130
2131 char* buf;
2132
2133 while (true) {
2134 rc = brecv(xsink, "recv", buf, bs, 0, timeout, false);
2135
2136 if (rc <= 0) {
2137 if (*xsink) {
2138 xsink->appendLastDescription(" (%zd bytes requested; %zu bytes received)", bs, str->size());
2139 }
2140 printd(5, "qore_socket_private::recv(" QSD ", %d) bs=" QSD ", br=" QSD ", rc=" QSD ", errno: %d "
2141 "(%s)\n", bufsize, timeout, bs, str->size(), rc, errno, strerror(errno));
2142 break;
2143 }
2144
2145 str->concat(buf, rc);
2146
2147 // register event
2148 if (source > 0) {
2149 do_read_event(rc, str->size(), bufsize, source);
2150 }
2151
2152 if (bufsize > 0) {
2153 if (str->size() >= (size_t)bufsize)
2154 break;
2155 if ((bufsize - str->size()) < bs)
2156 bs = bufsize - str->size();
2157 }
2158 }
2159
2160 printd(5, "qore_socket_private::recv() received " QSD " byte(s), bufsize=" QSD ", strlen=" QSD " str='%s'\n",
2161 str->size(), bufsize, (str ? str->strlen() : 0), str ? str->c_str() : "n/a");
2162
2163 // "fix" return code value if no error occurred
2164 if (rc >= 0)
2165 rc = str->size();
2166
2167 th.finalize(str->size());
2168
2169 if (*xsink) {
2170 return nullptr;
2171 }
2172
2173 if (source > 0) {
2174 do_data_event(QORE_EVENT_SOCKET_DATA_READ, source, **str);
2175 }
2176 return str.release();
2177 }
2178
2179 DLLLOCAL QoreStringNode* recvAll(ExceptionSink* xsink, int timeout, ssize_t& rc,
2180 int source = QORE_SOURCE_SOCKET) {
2181 assert(xsink);
2182 if (sock == QORE_INVALID_SOCKET) {
2183 se_not_open("Socket", "recv", xsink, "recvAll");
2184 rc = QSE_NOT_OPEN;
2185 return nullptr;
2186 }
2187 if (in_op >= 0) {
2188 if (in_op == q_gettid()) {
2189 se_in_op("Socket", "recv", xsink);
2190 return nullptr;
2191 }
2192 se_in_op_thread("Socket", "recv", xsink);
2193 return nullptr;
2194 }
2195
2196 PrivateQoreSocketThroughputHelper th(this, false);
2197
2199
2200 // perform first read with timeout
2201 char* buf;
2202 rc = brecv(xsink, "recv", buf, DEFAULT_SOCKET_BUFSIZE, 0, timeout, false);
2203 //printd(5, "qore_socket_private::brecv(to: %d) initial: rc=" QSD "\n", timeout, rc);
2204 if (rc <= 0) {
2205 return nullptr;
2206 }
2207
2208 str->concat(buf, rc);
2209
2210 // register event
2211 do_read_event(rc, rc);
2212
2213 // keep reading data until no more data is available without a timeout
2214 if (isDataAvailable(0, "recv", xsink)) {
2215 do {
2216 rc = brecv(xsink, "recv", buf, DEFAULT_SOCKET_BUFSIZE, 0, 0, false, true);
2217 //printd(5, "qore_socket_private::brecv(to: 0) rc=" QSD " rd=" QSD "\n", rc, str->size());
2218 // if the remote end has closed the connection, return what we have
2219 if (rc <= 0) {
2220 break;
2221 }
2222 str->concat(buf, rc);
2223
2224 // register event
2225 do_read_event(rc, str->size());
2226 } while (isDataAvailable(0, "recv", xsink));
2227 }
2228
2229 th.finalize(str->size());
2230
2231 if (*xsink) {
2232 return nullptr;
2233 }
2234
2235 rc = str->size();
2236 if (source > 0) {
2237 do_data_event(QORE_EVENT_SOCKET_DATA_READ, source, **str);
2238 }
2239 return str.release();
2240 }
2241
2242 DLLLOCAL int recv(int fd, ssize_t size, int timeout_ms, ExceptionSink* xsink);
2243
2244 DLLLOCAL BinaryNode* recvBinary(ExceptionSink* xsink, ssize_t bufsize, int timeout, ssize_t& rc,
2245 int source = QORE_SOURCE_SOCKET) {
2246 assert(xsink);
2247 if (sock == QORE_INVALID_SOCKET) {
2248 se_not_open("Socket", "recvBinary", xsink, "recvBinary");
2249 rc = QSE_NOT_OPEN;
2250 return 0;
2251 }
2252 if (in_op >= 0) {
2253 if (in_op == q_gettid()) {
2254 se_in_op("Socket", "recvBinary", xsink);
2255 return 0;
2256 }
2257 se_in_op_thread("Socket", "recvBinary", xsink);
2258 return 0;
2259 }
2260
2261 PrivateQoreSocketThroughputHelper th(this, false);
2262
2263 size_t bs = bufsize > 0 && bufsize < DEFAULT_SOCKET_BUFSIZE ? bufsize : DEFAULT_SOCKET_BUFSIZE;
2264
2266
2267 char* buf;
2268 while (true) {
2269 rc = brecv(xsink, "recvBinary", buf, bs, 0, timeout);
2270 if (rc <= 0)
2271 break;
2272
2273 b->append(buf, rc);
2274
2275 if (bufsize > 0) {
2276 if (b->size() >= (size_t)bufsize)
2277 break;
2278 if ((bufsize - b->size()) < bs)
2279 bs = bufsize - b->size();
2280 }
2281 }
2282
2283 th.finalize(b->size());
2284
2285 if (*xsink)
2286 return nullptr;
2287
2288 // "fix" return code value if no error occurred
2289 if (rc >= 0)
2290 rc = b->size();
2291
2292 if (source > 0) {
2293 do_data_event(QORE_EVENT_SOCKET_DATA_READ, source, **b);
2294 }
2295 printd(5, "qore_socket_private::recvBinary() received " QSD " byte(s), bufsize=" QSD ", blen=" QSD "\n",
2296 b->size(), bufsize, b->size());
2297 return b.release();
2298 }
2299
2300 DLLLOCAL BinaryNode* recvBinaryAll(ExceptionSink* xsink, int timeout, ssize_t& rc,
2301 int source = QORE_SOURCE_SOCKET) {
2302 assert(xsink);
2303 if (sock == QORE_INVALID_SOCKET) {
2304 se_not_open("Socket", "recvBinary", xsink, "recvBinaryAll");
2305 rc = QSE_NOT_OPEN;
2306 return 0;
2307 }
2308 if (in_op >= 0) {
2309 if (in_op == q_gettid()) {
2310 se_in_op("Socket", "recvBinary", xsink);
2311 return 0;
2312 }
2313 se_in_op_thread("Socket", "recvBinary", xsink);
2314 return 0;
2315 }
2316
2317 PrivateQoreSocketThroughputHelper th(this, false);
2318
2320
2321 //printd(5, "QoreSocket::recvBinary(%d, " QSD ") this: %p\n", timeout, rc, this);
2322 // perform first read with timeout
2323 char* buf;
2324 rc = brecv(xsink, "recvBinary", buf, DEFAULT_SOCKET_BUFSIZE, 0, timeout, false);
2325 //printd(5, "qore_socket_private::brecv(to: %d) initial: rc=" QSD "\n", timeout, rc);
2326 if (rc <= 0) {
2327 return nullptr;
2328 }
2329
2330 b->append(buf, rc);
2331
2332 // register event
2333 do_read_event(rc, rc);
2334
2335 // keep reading data until no more data is available without a timeout
2336 while (isDataAvailable(0, "recvBinary", xsink)) {
2337 rc = brecv(xsink, "recvBinary", buf, DEFAULT_SOCKET_BUFSIZE, 0, 0, false, true);
2338 //printd(5, "qore_socket_private::brecv(to: 0) rc=" QSD " rd=" QSD "\n", rc, b->size());
2339 // if the remote end has closed the connection, return what we have
2340 if (rc <= 0) {
2341 break;
2342 }
2343
2344 b->append(buf, rc);
2345
2346 // register event
2347 do_read_event(rc, b->size());
2348 }
2349
2350 th.finalize(b->size());
2351
2352 if (*xsink) {
2353 return nullptr;
2354 }
2355
2356 if (source > 0) {
2357 do_data_event(QORE_EVENT_SOCKET_DATA_READ, source, **b);
2358 }
2359 rc = b->size();
2360 //printd(5, "qore_socket_private() this: %p b: %p size: %lld\n", this, b->getPtr(), rc);
2361 return b.release();
2362 }
2363
2364 DLLLOCAL void recvToOutputStream(OutputStream *os, int64 size, int64 timeout, ExceptionSink *xsink,
2365 QoreThreadLock* l, int source = QORE_SOURCE_SOCKET) {
2366 if (sock == QORE_INVALID_SOCKET) {
2367 se_not_open("Socket", "recvToOutputStream", xsink);
2368 return;
2369 }
2370 if (in_op >= 0) {
2371 if (in_op == q_gettid()) {
2372 se_in_op("Socket", "recvToOutputStream", xsink);
2373 return;
2374 }
2375 se_in_op_thread("Socket", "recvToOutputStream", xsink);
2376 return;
2377 }
2378
2379 qore_socket_op_helper oh(this);
2380
2381 char* buf;
2382 ssize_t br = 0;
2383 while (size < 0 || br < size) {
2384 // calculate bytes needed
2385 int bn = size < 0 ? DEFAULT_SOCKET_BUFSIZE : QORE_MIN(size - br, DEFAULT_SOCKET_BUFSIZE);
2386
2387 ssize_t rc = brecv(xsink, "recvToOutputStream", buf, bn, 0, timeout);
2388 if (rc < 0) {
2389 //error - already reported in xsink
2390 return;
2391 }
2392 if (rc == 0) {
2393 //eof
2394 if (size >= 0) {
2395 //not all size bytes were read
2396 xsink->raiseException("SOCKET-RECV-ERROR", "Unexpected end of stream");
2397 }
2398 return;
2399 }
2400
2401 if (source > 0) {
2402 do_data_event(QORE_EVENT_SOCKET_DATA_READ, source, buf, rc);
2403 }
2404
2405 // write buffer to the stream
2406 {
2407 AutoUnlocker al(l);
2408 os->write(buf, rc, xsink);
2409 if (*xsink) {
2410 return;
2411 }
2412 }
2413
2414 br += rc;
2415 }
2416 }
2417
2418 DLLLOCAL QoreStringNode* readHTTPHeaderString(ExceptionSink* xsink, int timeout, int source) {
2419 assert(xsink);
2420 ssize_t rc;
2421 QoreStringNodeHolder hdr(readHTTPData(xsink, "readHTTPHeaderString", timeout, rc));
2422 if (!hdr) {
2423 assert(*xsink);
2424 return 0;
2425 }
2426 assert(rc > 0);
2427 do_data_event(QORE_EVENT_HTTP_HEADERS_READ, source, **hdr);
2428 return hdr.release();
2429 }
2430
2431 DLLLOCAL QoreHashNode* readHTTPHeader(ExceptionSink* xsink, QoreHashNode* info, int timeout,
2432 ssize_t& rc, int source, const char* headers_raw_key = "headers-raw") {
2433 assert(xsink);
2434 QoreStringNodeHolder hdr(readHTTPData(xsink, "readHTTPHeader", timeout, rc));
2435 if (!hdr) {
2436 assert(*xsink);
2437 return nullptr;
2438 }
2439
2440 if (hdr->empty()) {
2441 xsink->raiseException("SOCKET-HTTP-ERROR", "remote closed the connection while reading the HTTP header");
2442 return nullptr;
2443 }
2444 assert(rc > 0);
2445
2446 return processHttpHeaderString(xsink, hdr, info, source, headers_raw_key);
2447 }
2448
2450 DLLLOCAL QoreHashNode* processHttpHeaderString(ExceptionSink* xsink, QoreStringNodeHolder& hdr,
2451 QoreHashNode* info, int source, const char* headers_raw_key = "headers-raw") {
2452 const char* buf = hdr->c_str();
2453 char* p;
2454 if ((p = (char*)strstr(buf, "\r\n"))) {
2455 *p = '\0';
2456 p += 2;
2457 } else if ((p = (char*)strchr(buf, '\n'))) {
2458 *p = '\0';
2459 ++p;
2460 } else if ((p = (char*)strchr(buf, '\r'))) {
2461 *p = '\0';
2462 ++p;
2463 } else {
2464 // readHTTPData will only return a string that satisifies one of the above conditions,
2465 // however an embedded 0 could have been sent which would make the above searches invalid
2466 xsink->raiseException("SOCKET-HTTP-ERROR", "invalid header received with embedded nulls in "
2467 "Socket::readHTTPHeader()");
2468 return nullptr;
2469 }
2470
2471 char* t1;
2472 if (!(t1 = (char*)strstr(buf, "HTTP/"))) {
2473 xsink->raiseExceptionArg("SOCKET-HTTP-ERROR", hdr.release(), "missing HTTP version string in "
2474 "first header line in Socket::readHTTPHeader()");
2475 return nullptr;
2476 }
2477
2478 ReferenceHolder<QoreHashNode> h(new QoreHashNode(autoTypeInfo), xsink);
2479
2480 // process header flags
2481 int flags = CHF_PROCESS;
2482
2483 // get version
2484 {
2485 QoreStringNode* hv = new QoreStringNode(t1 + 5, 3, enc);
2486 h->setKeyValue("http_version", hv, nullptr);
2487 if (*hv == "1.1") {
2488 flags |= CHF_HTTP11;
2489 }
2490 }
2491
2492 // if we are getting a response
2493 // key for info if applicable
2494 const char* info_key;
2495 if (t1 == buf) {
2496 char* t2 = (char*)strchr(buf + 8, ' ');
2497 if (t2) {
2498 t2++;
2499 if (isdigit(*(t2))) {
2500 h->setKeyValue("status_code", atoi(t2), nullptr);
2501 if (strlen(t2) > 4) {
2502 h->setKeyValue("status_message", new QoreStringNode(t2 + 4), nullptr);
2503 }
2504 }
2505 }
2506 // write the status line as the "response-uri" key in the info hash if present
2507 // NOTE: this is not a URI, so the name is not really appropriate
2508 info_key = "response-uri";
2509 } else { // get method and path
2510 char* t2 = (char*)strchr(buf, ' ');
2511 if (t2) {
2512 *t2 = '\0';
2513 h->setKeyValue("method", new QoreStringNode(buf), nullptr);
2514 t2++;
2515 t1 = strchr(t2, ' ');
2516 if (t1) {
2517 *t1 = '\0';
2518 //printd(5, "found path '%s'\n", t2);
2519 // the path is returned as-is with no decodings - use decode_url() to decode
2520 h->setKeyValue("path", new QoreStringNode(t2, enc), nullptr);
2521 }
2522 }
2523 info_key = "request-uri";
2524 flags |= CHF_REQUEST;
2525 }
2526
2527 // write status line or request line to the info hash and raise a data event if applicable
2528 if (info || (event_queue && event_data)) {
2529 QoreStringNodeHolder status_line(new QoreStringNode(buf));
2530 if (info && event_queue && event_data) {
2531 status_line->ref();
2532 }
2533 if (event_queue && event_data) {
2534 do_data_event_intern(QORE_EVENT_SOCKET_DATA_READ, source, **status_line);
2535 }
2536 if (info) {
2537 info->setKeyValue(info_key, *status_line, nullptr);
2538 }
2539 status_line.release();
2540 }
2541
2542 bool close = convertHeaderToHash(*h, p, flags, info, &http_exp_chunked_body, headers_raw_key);
2543 do_read_http_header(QORE_EVENT_HTTP_MESSAGE_RECEIVED, *h, source);
2544
2545 // process header info
2546 if ((flags & CHF_REQUEST) && info) {
2547 info->setKeyValue("close", close, 0);
2548 }
2549
2550 return h.release();
2551 }
2552
2553 // info must be already referenced for the assignment, if present
2554 DLLLOCAL int runHeaderCallback(ExceptionSink* xsink, const char* cname, const char* mname,
2555 const ResolvedCallReferenceNode& callback, QoreThreadLock* l, const QoreHashNode* hdr, QoreHashNode* info,
2556 bool send_aborted = false, QoreObject* obj = nullptr) {
2557 assert(xsink);
2558 assert(obj);
2559 ReferenceHolder<QoreListNode> args(new QoreListNode(autoTypeInfo), xsink);
2560 QoreHashNode* arg = new QoreHashNode(autoTypeInfo);
2561 arg->setKeyValue("hdr", hdr ? hdr->refSelf() : nullptr, xsink);
2562 arg->setKeyValue("info", info, xsink);
2563 if (obj)
2564 arg->setKeyValue("obj", obj->refSelf(), xsink);
2565 arg->setKeyValue("send_aborted", send_aborted, xsink);
2566 args->push(arg, nullptr);
2567
2568 ValueHolder rv(xsink);
2569 return runCallback(xsink, cname, mname, rv, callback, l, *args);
2570 }
2571
2572 DLLLOCAL int runTrailerCallback(ExceptionSink* xsink, const char* cname, const char* mname,
2574 ValueHolder rv(xsink);
2575 if (runCallback(xsink, cname, mname, rv, callback, l, nullptr))
2576 return -1;
2577
2578 switch (rv->getType()) {
2579 case NT_NOTHING:
2580 break;
2581 case NT_HASH: {
2582 hdr = rv.release().get<QoreHashNode>();
2583 break;
2584 }
2585 default:
2586 xsink->raiseException("HTTP-TRAILER-ERROR", "chunked callback returned type '%s'; expecting 'hash' "
2587 "or 'NOTHING'", rv->getTypeName());
2588 return -1;
2589 }
2590 return 0;
2591 }
2592
2593 DLLLOCAL int runDataCallback(ExceptionSink* xsink, const char* cname, const char* mname,
2594 const ResolvedCallReferenceNode& callback, QoreThreadLock* l, const AbstractQoreNode* data, bool chunked) {
2595 assert(xsink);
2596 ReferenceHolder<QoreListNode> args(new QoreListNode(autoTypeInfo), xsink);
2597 QoreHashNode* arg = new QoreHashNode(autoTypeInfo);
2598 arg->setKeyValue("data", data->realCopy(), xsink);
2599 arg->setKeyValue("chunked", chunked, xsink);
2600 args->push(arg, nullptr);
2601
2602 ValueHolder rv(xsink);
2603 return runCallback(xsink, cname, mname, rv, callback, l, *args);
2604 }
2605
2606 DLLLOCAL int runCallback(ExceptionSink* xsink, const char* cname, const char* mname, ValueHolder& res,
2607 const ResolvedCallReferenceNode& callback, QoreThreadLock* l, const QoreListNode* args = nullptr) {
2608 assert(xsink);
2609 // FIXME: subtract callback execution time from socket performance measurement
2610
2611 // unlock and execute callback
2612 {
2613 AutoUnlocker al(l);
2614 res = callback.execValue(args, xsink);
2615 }
2616
2617 // check exception and socket status
2618 assert(xsink);
2619 return *xsink ? -1 : 0;
2620 }
2621
2622 DLLLOCAL int sendHttpChunkedWithCallback(ExceptionSink* xsink, const char* cname, const char* mname,
2623 const ResolvedCallReferenceNode& send_callback, QoreThreadLock& l, int source, int timeout_ms = -1,
2624 bool* aborted = nullptr) {
2625 assert(xsink);
2626 assert(!aborted || !(*aborted));
2627
2628 if (sock == QORE_INVALID_SOCKET) {
2629 se_not_open(cname, mname, xsink, "sendHttpChunkedWithCallback");
2630 return QSE_NOT_OPEN;
2631 }
2632 if (in_op >= 0) {
2633 if (in_op == q_gettid()) {
2634 se_in_op(cname, mname, xsink);
2635 return 0;
2636 }
2637 se_in_op_thread(cname, mname, xsink);
2638 return 0;
2639 }
2640
2641 PrivateQoreSocketThroughputHelper th(this, true);
2642
2643 // set the non-blocking flag (for use with non-ssl connections)
2644 bool nb = (timeout_ms >= 0);
2645 // set non-blocking I/O (and restore on exit) if we have a timeout and a non-ssl connection
2646 OptionalNonBlockingHelper onbh(*this, !ssl && nb, xsink);
2647 if (*xsink)
2648 return -1;
2649
2650 qore_socket_op_helper oh(this);
2651
2652 ssize_t rc;
2653 int64 total = 0;
2654 bool done = false;
2655
2656 while (!done) {
2657 // if we have response data already, then we assume an error and abort
2658 if (aborted) {
2659 bool data_available = tryReadSocketData(mname, xsink);
2660 //printd(5, "qore_socket_private::sendHttpChunkedWithCallback() this: %p aborted: %p iDA: %d\n", this, aborted, data_available);
2661 if (data_available || *xsink) {
2662 *aborted = true;
2663 return *xsink ? -1 : 0;
2664 }
2665 }
2666
2667 // FIXME: subtract callback execution time from socket performance measurement
2668 ValueHolder res(xsink);
2669 rc = runCallback(xsink, cname, mname, res, send_callback, &l);
2670 if (rc)
2671 return rc;
2672
2673 //printd(5, "qore_socket_private::sendHttpChunkedWithCallback() this: %p res: %s\n", this, get_type_name(*res));
2674
2675 // check callback return val
2676 QoreString buf;
2677 // do not copy data here; set references and send the data directly
2678 const char* data_ptr = nullptr;
2679 size_t data_size = 0;
2680
2681 switch (res->getType()) {
2682 case NT_STRING: {
2683 const QoreStringNode* str = res->get<const QoreStringNode>();
2684 if (str->empty()) {
2685 done = true;
2686 break;
2687 }
2688 buf.sprintf("%x\r\n", (int)str->size());
2689 data_ptr = str->c_str();
2690 data_size = str->size();
2691 //buf.concat(str->c_str(), str->size());
2692 break;
2693 }
2694
2695 case NT_BINARY: {
2696 const BinaryNode* b = res->get<const BinaryNode>();
2697 if (b->empty()) {
2698 done = true;
2699 break;
2700 }
2701 buf.sprintf("%x\r\n", (int)b->size());
2702 data_ptr = static_cast<const char*>(b->getPtr());
2703 data_size = b->size();
2704 //buf.concat((const char*)b->getPtr(), b->size());
2705 break;
2706 }
2707
2708 case NT_HASH: {
2709 buf.concat("0\r\n");
2710
2711 const QoreHashNode* h = res->get<const QoreHashNode>();
2712 ConstHashIterator hi(h);
2713 while (hi.next()) {
2714 const QoreValue v = hi.get();
2715 const char* key = hi.getKey();
2716
2717 //printd(5, "qore_socket_private::sendHttpChunkedWithCallback() this: %p trailer %s\n", this, key);
2718
2719 if (v.getType() == NT_LIST) {
2720 ConstListIterator li(v.get<const QoreListNode>());
2721 while (li.next())
2722 do_header(key, buf, li.getValue());
2723 } else
2724 do_header(key, buf, v);
2725 }
2726 // fall through to next case
2727 }
2728
2729 case NT_NOTHING:
2730 case NT_NULL:
2731 done = true;
2732 break;
2733
2734 default:
2735 xsink->raiseException("SOCKET-CALLBACK-ERROR", "HTTP chunked data callback returned type '%s'; expecting one of: 'string', 'binary', 'hash', 'nothing' (or 'NULL')", res->getTypeName());
2736 return -1;
2737 }
2738
2739 // send chunk buffer data
2740 if (!buf.empty()) {
2741 rc = sendIntern(xsink, cname, mname, buf.c_str(), buf.size(), timeout_ms, total, true);
2742 }
2743
2744 if (!*xsink) {
2745 assert(rc >= 0);
2746 // send actual data, if available
2747 if (data_ptr && data_size) {
2748 rc = sendIntern(xsink, cname, mname, data_ptr, data_size, timeout_ms, total, true);
2749 }
2750
2751 if (!*xsink) {
2752 assert(rc >= 0);
2753 if (buf.empty() && (!data_ptr || !data_size)) {
2754 buf.set("0\r\n\r\n");
2755 } else {
2756 buf.set("\r\n");
2757 }
2758 rc = sendIntern(xsink, cname, mname, buf.c_str(), buf.size(), timeout_ms, total, true);
2759 }
2760 }
2761
2762 if (!*xsink) {
2763 // do events
2764 switch (res->getType()) {
2765 case NT_STRING: {
2766 const QoreStringNode* str = res->get<const QoreStringNode>();
2767 if (!str->empty()) {
2768 do_data_event(QORE_EVENT_HTTP_CHUNKED_DATA_SENT, source, *str);
2769 }
2770 break;
2771 }
2772
2773 case NT_BINARY: {
2774 const BinaryNode* b = res->get<const BinaryNode>();
2775 if (!b->empty()) {
2776 do_data_event(QORE_EVENT_HTTP_CHUNKED_DATA_SENT, source, *b);
2777 }
2778 break;
2779 }
2780
2781 case NT_HASH: {
2782 const QoreHashNode* h = res->get<const QoreHashNode>();
2783 do_header_event(QORE_EVENT_HTTP_FOOTERS_SENT, source, *h);
2784 break;
2785 }
2786 }
2787 }
2788
2789 if (rc < 0) {
2790 // if we have a socket I/O error, but also data to be read on the socket, then clear the exception and return 0
2791 if (aborted && *xsink) {
2792 bool data_available = tryReadSocketData(mname, xsink);
2793 //printd(5, "qore_socket_private::sendHttpChunkedWithCallback() this: %p aborted: %p iDA: %d\n", this, aborted, data_available);
2794 if (data_available) {
2795 *aborted = true;
2796 return *xsink ? -1 : 0;
2797 }
2798 }
2799
2800 //printd(5, "qore_socket_private::sendHttpChunkedWithCallback() this: %p rc: %d sock: %d xsink: %d\n", this, rc, sock, xsink->isException());
2801 }
2802
2803 //printd(5, "qore_socket_private::sendHttpChunkedWithCallback() this: %p sent: %s\n", this, buf.c_str());
2804
2805 if (rc < 0 || sock == QORE_INVALID_SOCKET)
2806 break;
2807 }
2808
2809 th.finalize(total);
2810
2811 return rc < 0 || sock == QORE_INVALID_SOCKET ? -1 : 0;
2812 }
2813
2814 // return: the number of bytes sent or an error (< 0)
2815 DLLLOCAL ssize_t sendIntern(ExceptionSink* xsink, const char* cname, const char* mname, const char* buf, size_t size,
2816 int timeout_ms, int64& total, bool stream = false) {
2817 assert(xsink);
2818 ssize_t rc;
2819 size_t bs = 0;
2820
2821 // set the non-blocking flag (for use with non-ssl connections)
2822 bool nb = (timeout_ms >= 0);
2823
2824 while (true) {
2825 if (ssl) {
2826 // SSL_MODE_ENABLE_PARTIAL_WRITE is enabled so we can get finer-grained socket events for do_send_event() below
2827 rc = ssl->write(mname, buf + bs, size - bs, timeout_ms, xsink);
2828 } else {
2829 while (true) {
2830 rc = ::send(sock, buf + bs, size - bs, 0);
2831 //printd(5, "qore_socket_private::send() this: %p Socket::%s() buf: %p size: %zu timeout_ms: %d ssl: %p nb: %d bs: %zu" rc: %zd\n", this, mname, buf, size, timeout_ms, ssl, nb, bs, rc);
2832 // try again if we were interrupted by a signal
2833 if (rc >= 0)
2834 break;
2835 sock_get_error();
2836 // check that the send finishes before the timeout if we are using non-blocking I/O
2837 if (nb && (errno == EAGAIN
2838#ifdef EWOULDBLOCK
2839 || errno == EWOULDBLOCK
2840#endif
2841 )) {
2842 if (!isWriteFinished(timeout_ms, mname, xsink)) {
2843 if (*xsink)
2844 return -1;
2845 se_timeout("Socket", mname, timeout_ms, xsink);
2846 rc = QSE_TIMEOUT;
2847 break;
2848 }
2849 continue;
2850 }
2851 if (errno != EINTR) {
2852 //printd(5, "qore_socket_private::send() bs: %ld rc: " QSD " len: " QSD " (total: " QSD ") errno: %d sock: %d\n", bs, rc, size - bs, size, errno, sock);
2853 xsink->raiseErrnoException("SOCKET-SEND-ERROR", errno, "error while executing %s::%s()", cname, mname);
2854
2855 // do not close the socket even if we have EPIPE or ECONNRESET in case there is data to be read when streaming
2856#ifdef EPIPE
2857 if (!stream && errno == EPIPE) {
2858 close();
2859 }
2860#endif
2861#ifdef ECONNRESET
2862 if (!stream && errno == ECONNRESET) {
2863 close();
2864 }
2865#endif
2866 break;
2867 }
2868 }
2869 }
2870
2871 if (rc > 0) {
2872 total += rc;
2873 }
2874 //printd(5, "qore_socket_private::send() bs: %ld rc: " QSD " len: " QSD " (total: " QSD ") errno: %d\n", bs, rc, size - bs, size, errno);
2875 if (rc < 0 || sock == QORE_INVALID_SOCKET) {
2876 break;
2877 }
2878 bs += rc;
2879 do_send_event(rc, bs, size);
2880 if (bs >= size) {
2881 break;
2882 }
2883 }
2884
2885 return (ssize_t)bs;
2886 }
2887
2888 DLLLOCAL int send(int fd, ssize_t size, int timeout_ms, ExceptionSink* xsink);
2889
2890 // returns: 0 = OK
2891 DLLLOCAL int send(ExceptionSink* xsink, const char* cname, const char* mname, const char* buf, size_t size,
2892 int timeout_ms = -1, int source = QORE_SOURCE_SOCKET) {
2893 assert(xsink);
2894 if (sock == QORE_INVALID_SOCKET) {
2895 se_not_open(cname, mname, xsink, "send");
2896 return QSE_NOT_OPEN;
2897 }
2898 if (in_op >= 0) {
2899 if (in_op == q_gettid()) {
2900 se_in_op(cname, mname, xsink);
2901 return 0;
2902 }
2903 se_in_op_thread(cname, mname, xsink);
2904 return 0;
2905 }
2906 if (!size) {
2907 return 0;
2908 }
2909
2910 PrivateQoreSocketThroughputHelper th(this, true);
2911
2912 // set the non-blocking flag (for use with non-ssl connections)
2913 bool nb = (timeout_ms >= 0);
2914 // set non-blocking I/O (and restore on exit) if we have a timeout and a non-ssl connection
2915 OptionalNonBlockingHelper onbh(*this, !ssl && nb, xsink);
2916 if (*xsink) {
2917 return -1;
2918 }
2919
2920 int64 total = 0;
2921 ssize_t rc = sendIntern(xsink, cname, mname, buf, size, timeout_ms, total);
2922 th.finalize(total);
2923
2924 if (rc > 0 && source > 0) {
2925 do_data_event(QORE_EVENT_SOCKET_DATA_SENT, source, buf, size);
2926 }
2927
2928 return rc < 0 || sock == QORE_INVALID_SOCKET ? rc : 0;
2929 }
2930
2931 DLLLOCAL void sendFromInputStream(InputStream *is, int64 size, int64 timeout, ExceptionSink *xsink,
2932 QoreThreadLock* l) {
2933 if (sock == QORE_INVALID_SOCKET) {
2934 se_not_open("Socket", "sendFromInputStream", xsink);
2935 return;
2936 }
2937 if (in_op >= 0) {
2938 if (in_op == q_gettid()) {
2939 se_in_op("Socket", "sendFromInputStream", xsink);
2940 return;
2941 }
2942 se_in_op_thread("Socket", "sendFromInputStream", xsink);
2943 return;
2944 }
2945
2946 qore_socket_op_helper oh(this);
2947
2948 PrivateQoreSocketThroughputHelper th(this, true);
2949
2950 // set the non-blocking flag (for use with non-ssl connections)
2951 bool nb = (timeout >= 0);
2952 // set non-blocking I/O (and restore on exit) if we have a timeout and a non-ssl connection
2953 OptionalNonBlockingHelper onbh(*this, !ssl && nb, xsink);
2954 if (*xsink)
2955 return;
2956
2957 char buf[DEFAULT_SOCKET_BUFSIZE];
2958 int64 sent = 0;
2959 int64 total = 0;
2960 while (size < 0 || sent < size) {
2961 int64 toRead = size < 0 ? DEFAULT_SOCKET_BUFSIZE : QORE_MIN(size - sent, DEFAULT_SOCKET_BUFSIZE);
2962 int64 r;
2963 {
2964 AutoUnlocker al(l);
2965 r = is->read(buf, toRead, xsink);
2966 if (*xsink) {
2967 return;
2968 }
2969 }
2970 if (r == 0) {
2971 //eof
2972 if (size >= 0) {
2973 //not all size bytes were sent
2974 xsink->raiseException("SOCKET-SEND-ERROR", "Unexpected end of stream");
2975 return;
2976 }
2977 break;
2978 }
2979
2980 ssize_t rc = sendIntern(xsink, "Socket", "sendFromInputStream", buf, r, timeout, total);
2981 if (rc < 0) {
2982 return;
2983 }
2984 do_data_event(QORE_EVENT_SOCKET_DATA_SENT, QORE_SOURCE_SOCKET, buf, r);
2985 sent += r;
2986 }
2987 th.finalize(total);
2988 }
2989
2990 DLLLOCAL void sendHttpChunkedBodyFromInputStream(InputStream* is, size_t max_chunk_size, int timeout, ExceptionSink* xsink, QoreThreadLock* l, const ResolvedCallReferenceNode* trailer_callback) {
2991 if (sock == QORE_INVALID_SOCKET) {
2992 se_not_open("Socket", "sendHttpChunkedBodyFromInputStream", xsink);
2993 return;
2994 }
2995 if (in_op >= 0) {
2996 if (in_op == q_gettid()) {
2997 se_in_op("Socket", "sendHttpChunkedBodyFromInputStream", xsink);
2998 return;
2999 }
3000 se_in_op_thread("Socket", "sendHttpChunkedBodyFromInputStream", xsink);
3001 return;
3002 }
3003
3004 qore_socket_op_helper oh(this);
3005
3006 PrivateQoreSocketThroughputHelper th(this, true);
3007
3008 // set the non-blocking flag (for use with non-ssl connections)
3009 bool nb = (timeout >= 0);
3010 // set non-blocking I/O (and restore on exit) if we have a timeout and a non-ssl connection
3011 OptionalNonBlockingHelper onbh(*this, !ssl && nb, xsink);
3012 if (*xsink)
3013 return;
3014
3016 // reserve enough space for the maximum size of the buffer + HTTP overhead
3017 buf->preallocate(max_chunk_size);
3018 int64 total = 0;
3019 while (true) {
3020 int64 r;
3021 {
3022 AutoUnlocker al(l);
3023 r = is->read((void*)buf->getPtr(), sizeof(max_chunk_size), xsink);
3024 if (*xsink)
3025 return;
3026 }
3027
3028 // send HTTP chunk prelude with chunk size
3029 QoreString str;
3030 str.sprintf("%x\r\n", (int)r);
3031 int rc = sendIntern(xsink, "Socket", "sendHttpChunkedBodyFromInputStream", str.c_str(), str.size(), timeout, total, true);
3032 if (rc < 0)
3033 return;
3034
3035 bool trailers = false;
3036
3037 // send chunk data, if any
3038 if (r) {
3039 rc = sendIntern(xsink, "Socket", "sendHttpChunkedBodyFromInputStream", (const char*)buf->getPtr(), r, timeout, total, true);
3040 if (rc < 0)
3041 return;
3042 do_data_event(QORE_EVENT_HTTP_CHUNKED_DATA_SENT, QORE_SOURCE_SOCKET, buf->getPtr(), r);
3043 } else if (trailer_callback) {
3044 // get and send chunk trailers, if any
3046
3047 if (runTrailerCallback(xsink, "Socket", "sendHttpChunkedBodyFromInputStream", *trailer_callback, l, h))
3048 return;
3049 if (h) {
3050 str.clear();
3051 do_headers(str, *h, 0, false);
3052
3053 rc = sendIntern(xsink, "Socket", "sendHttpChunkedBodyFromInputStream", str.c_str(), str.size(), timeout, total, true);
3054 if (rc < 0)
3055 return;
3056
3057 do_header_event(QORE_EVENT_HTTP_FOOTERS_SENT, QORE_SOURCE_SOCKET, **h);
3058 trailers = true;
3059 }
3060 }
3061
3062 // close chunk if we sent no trailers
3063 if (!trailers) {
3064 str.set("\r\n");
3065 rc = sendIntern(xsink, "Socket", "sendHttpChunkedBodyFromInputStream", str.c_str(), str.size(), timeout, total, true);
3066 if (rc < 0)
3067 return;
3068 }
3069
3070 if (!r) {
3071 // end of stream
3072 break;
3073 }
3074 }
3075 th.finalize(total);
3076 }
3077
3078 DLLLOCAL void sendHttpChunkedBodyTrailer(const QoreHashNode* headers, int timeout, ExceptionSink* xsink) {
3079 if (sock == QORE_INVALID_SOCKET) {
3080 se_not_open("Socket", "sendHttpChunkedBodyTrailer", xsink);
3081 return;
3082 }
3083 if (in_op >= 0) {
3084 if (in_op == q_gettid()) {
3085 se_in_op("Socket", "sendHttpChunkedBodyTrailer", xsink);
3086 return;
3087 }
3088 se_in_op_thread("Socket", "sendHttpChunkedBodyTrailer", xsink);
3089 return;
3090 }
3091
3092 QoreString buf;
3093 if (!headers) {
3094 ConstHashIterator hi(headers);
3095
3096 while (hi.next()) {
3097 const QoreValue v = hi.get();
3098 const char* key = hi.getKey();
3099
3100 if (v.getType() == NT_LIST) {
3101 ConstListIterator li(v.get<const QoreListNode>());
3102 while (li.next())
3103 do_header(key, buf, li.getValue());
3104 }
3105 else
3106 do_header(key, buf, v);
3107 }
3108 }
3109 buf.concat("\r\n");
3110 int64 total;
3111 sendIntern(xsink, "Socket", "sendHttpChunkedBodyTrailer", buf.c_str(), buf.size(), timeout, total, true);
3112 if (!*xsink) {
3113 do_header_event(QORE_EVENT_HTTP_FOOTERS_SENT, QORE_SOURCE_SOCKET, *headers);
3114 }
3115 }
3116
3117 DLLLOCAL void getSendHttpMessageHeaders(QoreString& hdr, QoreHashNode* info, const char* method, const char* path,
3118 const char* http_version, const QoreHashNode* headers, size_t size, int source) {
3119 // prepare header string
3120 hdr.sprintf("%s %s HTTP/%s", method, path && path[0] ? path : "/", http_version);
3121
3122 // write request-uri key if info hash is non-null
3123 if (info) {
3124 info->setKeyValue("request-uri", new QoreStringNode(hdr), nullptr);
3125 }
3126
3127 getSendHttpMessageHeadersCommon(hdr, info, headers, size, source);
3128 }
3129
3130 DLLLOCAL void getSendHttpMessageHeadersCommon(QoreString& hdr, QoreHashNode* info, const QoreHashNode* headers,
3131 size_t size, int source) {
3132 // send event
3133 do_send_http_message_event(hdr, headers, source);
3134
3135 // add headers
3136 hdr.concat("\r\n");
3137 // insert headers
3138 do_headers(hdr, headers, size);
3139 }
3140
3141 DLLLOCAL int sendHttpMessage(ExceptionSink* xsink, QoreHashNode* info, const char* cname, const char* mname,
3142 const char* method, const char* path, const char* http_version, const QoreHashNode* headers,
3143 const QoreStringNode* body, const void* data, size_t size,
3144 const ResolvedCallReferenceNode* send_callback, InputStream* input_stream, size_t max_chunk_size,
3145 const ResolvedCallReferenceNode* trailer_callback, int source, int timeout_ms = -1,
3146 QoreThreadLock* l = nullptr, bool* aborted = nullptr) {
3147 // prepare header string
3148 QoreString hdr(enc);
3149
3150 hdr.sprintf("%s %s HTTP/%s", method, path && path[0] ? path : "/", http_version);
3151
3152 // write request-uri key if info hash is non-null
3153 if (info) {
3154 info->setKeyValue("request-uri", new QoreStringNode(hdr), nullptr);
3155 }
3156
3157 return sendHttpMessageCommon(xsink, hdr, info, cname, mname, headers, body, data, size, send_callback,
3158 input_stream, max_chunk_size, trailer_callback, source, timeout_ms, l, aborted);
3159 }
3160
3161 DLLLOCAL int sendHttpResponse(ExceptionSink* xsink, QoreHashNode* info, const char* cname, const char* mname,
3162 int code, const char* desc, const char* http_version, const QoreHashNode* headers, const QoreStringNode* body,
3163 const void* data, size_t size, const ResolvedCallReferenceNode* send_callback, InputStream* input_stream,
3164 size_t max_chunk_size, const ResolvedCallReferenceNode* trailer_callback, int source, int timeout_ms = -1,
3165 QoreThreadLock* l = nullptr, bool* aborted = nullptr) {
3166 // prepare header string
3167 QoreString hdr(enc);
3168
3169 // write HTTP response status line
3170 hdr.sprintf("HTTP/%s %03d %s", http_version, code, desc);
3171
3172 // write the status line as the "response-uri" key if info hash is non-null
3173 // NOTE: this is not a URI, so the name is not really appropriate
3174 if (info) {
3175 info->setKeyValue("response-uri", new QoreStringNode(hdr), nullptr);
3176 }
3177
3178 return sendHttpMessageCommon(xsink, hdr, info, cname, mname, headers, body, data, size, send_callback,
3179 input_stream, max_chunk_size, trailer_callback, source, timeout_ms, l, aborted);
3180 }
3181
3182 DLLLOCAL int sendHttpMessageCommon(ExceptionSink* xsink, QoreString& hdr, QoreHashNode* info, const char* cname,
3183 const char* mname, const QoreHashNode* headers, const QoreStringNode* body, const void* data,
3184 size_t size, const ResolvedCallReferenceNode* send_callback, InputStream* input_stream,
3185 size_t max_chunk_size, const ResolvedCallReferenceNode* trailer_callback, int source, int timeout_ms = -1,
3186 QoreThreadLock* l = nullptr, bool* aborted = nullptr) {
3187 assert(xsink);
3188 assert(!(data && send_callback));
3189 assert(!(data && input_stream));
3190 assert(!(send_callback && input_stream));
3191
3192 // send event
3193 do_send_http_message_event(hdr, headers, source);
3194
3195 // add headers
3196 hdr.concat("\r\n");
3197 // insert headers
3198 do_headers(hdr, headers, size && data ? size : 0);
3199
3200 //printd(5, "qore_socket_private::sendHttpMessage() hdr: %s\n", hdr.c_str());
3201
3202 // send URI and headers
3203 int rc;
3204 if ((rc = send(xsink, cname, mname, hdr.c_str(), hdr.size(), timeout_ms, -1)))
3205 return rc;
3206
3207 // header message sent above with do_sent_http_message_event()
3208 if (size && data) {
3209 int rc = send(xsink, cname, mname, (char*)data, size, timeout_ms, -1);
3210 if (!rc) {
3211 if (body) {
3212 do_data_event(QORE_EVENT_SOCKET_DATA_SENT, source, *body);
3213 } else {
3214 do_data_event(QORE_EVENT_SOCKET_DATA_SENT, source, data, size);
3215 }
3216 }
3217 return rc;
3218 } else if (send_callback) {
3219 assert(l);
3220 assert(!aborted || !(*aborted));
3221 return sendHttpChunkedWithCallback(xsink, cname, mname, *send_callback, *l, source, timeout_ms, aborted);
3222 } else if (input_stream) {
3223 assert(l);
3224 sendHttpChunkedBodyFromInputStream(input_stream, max_chunk_size, timeout_ms, xsink, l, trailer_callback);
3225 return *xsink ? -1 : 0;
3226 }
3227
3228 return 0;
3229 }
3230
3231 DLLLOCAL QoreHashNode* readHttpChunkedBodyBinary(int timeout, ExceptionSink* xsink, const char* cname, int source,
3232 const ResolvedCallReferenceNode* recv_callback = nullptr, QoreThreadLock* l = nullptr,
3233 QoreObject* obj = nullptr, OutputStream* os = nullptr, bool read_once = false) {
3234 assert(xsink);
3235
3236 if (sock == QORE_INVALID_SOCKET) {
3237 se_not_open(cname, "readHTTPChunkedBodyBinary", xsink);
3238 return 0;
3239 }
3240 if (in_op >= 0) {
3241 if (in_op == q_gettid()) {
3242 se_in_op(cname, "readHTTPChunkedBodyBinary", xsink);
3243 return 0;
3244 }
3245 se_in_op_thread(cname, "readHTTPChunkedBodyBinary", xsink);
3246 return 0;
3247 }
3248
3249 // reset "expecting HTTP chunked body" flag
3250 if (http_exp_chunked_body)
3251 http_exp_chunked_body = false;
3252
3253 qore_socket_op_helper oh(this);
3254
3255 SimpleRefHolder<BinaryNode> b(os ? nullptr : new BinaryNode);
3256 QoreString str; // for reading the size of each chunk
3257
3258 ssize_t rc;
3259 // read the size then read the data and append to buffer
3260 while (true) {
3261 // state = 0, nothing
3262 // state = 1, \r received
3263 int state = 0;
3264 while (true) {
3265 char* buf;
3266 rc = brecv(xsink, "readHTTPChunkedBodyBinary", buf, 1, 0, timeout, false);
3267 if (rc <= 0) {
3268 if (!*xsink) {
3269 assert(!rc);
3270 se_closed(cname, "readHTTPChunkedBodyBinary", xsink);
3271 }
3272 return 0;
3273 }
3274
3275 char c = buf[0];
3276
3277 if (!state && c == '\r')
3278 state = 1;
3279 else if (state && c == '\n')
3280 break;
3281 else {
3282 if (state) {
3283 state = 0;
3284 str.concat('\r');
3285 }
3286 str.concat(c);
3287 }
3288 }
3289 // DEBUG
3290 //printd(5, "QoreSocket::readHTTPChunkedBodyBinary(): got chunk size (" QSD " bytes) string: %s\n",
3291 // str.strlen(), str.c_str());
3292
3293 // terminate string at ';' char if present
3294 char* p = (char*)strchr(str.c_str(), ';');
3295 if (p)
3296 *p = '\0';
3297 long size = strtol(str.c_str(), 0, 16);
3298 do_chunked_read(QORE_EVENT_HTTP_CHUNK_SIZE, size, str.size(), source);
3299
3300 if (!size) {
3301 break;
3302 }
3303
3304 if (size < 0) {
3305 xsink->raiseException("READ-HTTP-CHUNK-ERROR", "negative value given for chunk size (%ld)", size);
3306 return 0;
3307 }
3308
3309 // prepare string for chunk
3310 //str.allocate(size + 1);
3311
3312 ssize_t bs = size < DEFAULT_SOCKET_BUFSIZE ? size : DEFAULT_SOCKET_BUFSIZE;
3313 ssize_t br = 0; // bytes received
3314 while (true) {
3315 char* buf;
3316 rc = brecv(xsink, "readHTTPChunkedBodyBinary", buf, bs, 0, timeout, false);
3317 //printd(5, "qore_socket_private::readHTTPChunkedBodyBinary() str: '%s' bs: %lld rc: %lld b: %p "
3318 // "(%lld) recv_callback: %p\n", str.c_str(), bs, rc, *b, b->size(), recv_callback);
3319 if (rc <= 0) {
3320 if (!*xsink) {
3321 assert(!rc);
3322 se_closed(cname, "readHTTPChunkedBodyBinary", xsink);
3323 }
3324 return nullptr;
3325 }
3326
3327 do_data_event(QORE_EVENT_HTTP_CHUNKED_DATA_READ, source, buf, (size_t)rc);
3328
3329 if (os) {
3330 AutoUnlocker al(l);
3331 os->write(buf, rc, xsink);
3332 if (*xsink)
3333 return nullptr;
3334 } else {
3335 b->append(buf, rc);
3336 }
3337 br += rc;
3338
3339 if (br >= size) {
3340 break;
3341 }
3342 if (size - br < bs) {
3343 bs = size - br;
3344 }
3345 }
3346
3347 // DEBUG
3348 //printd(5, "QoreSocket::readHTTPChunkedBodyBinary(): received binary chunk: size: %d br=" QSD " total=" QSD "\n", size, br, b->size());
3349
3350 // read crlf after chunk
3351 // FIXME: bytes read are not checked if they equal CRLF
3352 br = 0;
3353 while (br < 2) {
3354 char* buf;
3355 rc = brecv(xsink, "readHTTPChunkedBodyBinary", buf, 2 - br, 0, timeout, false);
3356 if (rc <= 0) {
3357 if (!*xsink) {
3358 assert(!rc);
3359 se_closed(cname, "readHTTPChunkedBodyBinary", xsink);
3360 }
3361 return nullptr;
3362 }
3363 br += rc;
3364 }
3365
3366 do_chunked_read(QORE_EVENT_HTTP_CHUNKED_DATA_RECEIVED, size, size + 2, source);
3367
3368 if (recv_callback && !os) {
3369 if (runDataCallback(xsink, cname, "readHTTPChunkedBodyBinary", *recv_callback, l, *b, true))
3370 return nullptr;
3371 if (b)
3372 b->clear();
3373 }
3374
3375 if (read_once) {
3376 assert(!recv_callback);
3377 assert(!os);
3378 ReferenceHolder<QoreHashNode> h(new QoreHashNode(autoTypeInfo), xsink);
3379 h->setKeyValue("body", b.release(), xsink);
3380 return h.release();
3381 }
3382
3383 // ensure string is blanked for next read
3384 str.clear();
3385 }
3386
3387 // read footers or nothing
3388 QoreStringNodeHolder hdr(readHTTPData(xsink, "readHTTPChunkedBodyBinary", timeout, rc, true));
3389 if (*xsink)
3390 return nullptr;
3391
3392 ReferenceHolder<QoreHashNode> h(new QoreHashNode(autoTypeInfo), xsink);
3393 if (!recv_callback && !os) {
3394 h->setKeyValue("body", b.release(), xsink);
3395 }
3396
3398
3399 if (hdr) {
3400 if (hdr->strlen() >= 2 && hdr->strlen() <= 4)
3401 return recv_callback ? 0 : h.release();
3402
3403 if (recv_callback) {
3404 info = new QoreHashNode(autoTypeInfo);
3405 }
3406 convertHeaderToHash(*h, (char*)hdr->c_str(), 0, *info, nullptr, "response-headers-raw");
3407 do_read_http_header(QORE_EVENT_HTTP_FOOTERS_RECEIVED, *h, source);
3408 }
3409
3410 if (recv_callback) {
3411 runHeaderCallback(xsink, cname, "readHTTPChunkedBodyBinary", *recv_callback, l, h->empty() ? nullptr : *h,
3412 info.release(), false, obj);
3413 return 0;
3414 }
3415
3416 return h.release();
3417 }
3418
3419 // receive a message in HTTP chunked format
3420 DLLLOCAL QoreHashNode* readHttpChunkedBody(int timeout, ExceptionSink* xsink, const char* cname, int source,
3421 const ResolvedCallReferenceNode* recv_callback = 0, QoreThreadLock* l = 0, QoreObject* obj = 0) {
3422 assert(xsink);
3423
3424 if (sock == QORE_INVALID_SOCKET) {
3425 se_not_open(cname, "readHTTPChunkedBody", xsink);
3426 return 0;
3427 }
3428 if (in_op >= 0) {
3429 if (in_op == q_gettid()) {
3430 se_in_op(cname, "readHTTPChunkedBody", xsink);
3431 return 0;
3432 }
3433 se_in_op_thread(cname, "readHTTPChunkedBody", xsink);
3434 return 0;
3435 }
3436
3437 // reset "expecting HTTP chunked body" flag
3438 if (http_exp_chunked_body)
3439 http_exp_chunked_body = false;
3440
3441 qore_socket_op_helper oh(this);
3442
3444 QoreString str; // for reading the size of each chunk
3445
3446 ssize_t rc;
3447 // read the size then read the data and append to buf
3448 while (true) {
3449 // state = 0, nothing
3450 // state = 1, \r received
3451 int state = 0;
3452 while (true) {
3453 char* tbuf;
3454 rc = brecv(xsink, "readHTTPChunkedBody", tbuf, 1, 0, timeout, false);
3455 if (rc <= 0) {
3456 if (!*xsink) {
3457 assert(!rc);
3458 se_closed(cname, "readHTTPChunkedBody", xsink);
3459 }
3460 return 0;
3461 }
3462
3463 char c = tbuf[0];
3464
3465 if (!state && c == '\r')
3466 state = 1;
3467 else if (state && c == '\n')
3468 break;
3469 else {
3470 if (state) {
3471 state = 0;
3472 str.concat('\r');
3473 }
3474 str.concat(c);
3475 }
3476 }
3477 // DEBUG
3478 //printd(5, "got chunk size (" QSD " bytes) string: %s\n", str.strlen(), str.c_str());
3479
3480 // terminate string at ';' char if present
3481 char* p = (char*)strchr(str.c_str(), ';');
3482 if (p)
3483 *p = '\0';
3484 ssize_t size = strtol(str.c_str(), 0, 16);
3485 do_chunked_read(QORE_EVENT_HTTP_CHUNK_SIZE, size, str.strlen(), source);
3486
3487 if (!size)
3488 break;
3489
3490 if (size < 0) {
3491 xsink->raiseException("READ-HTTP-CHUNK-ERROR", "negative value given for chunk size (%ld)", size);
3492 return 0;
3493 }
3494 // ensure string is blanked for next read
3495 str.clear();
3496
3497 // prepare string for chunk
3498 //buf->allocate((unsigned)(buf->strlen() + size + 1));
3499
3500 // read chunk directly into string buffer
3501 ssize_t bs = size < DEFAULT_SOCKET_BUFSIZE ? size : DEFAULT_SOCKET_BUFSIZE;
3502 ssize_t br = 0; // bytes received
3503 str.clear();
3504 while (true) {
3505 char* tbuf;
3506 rc = brecv(xsink, "readHTTPChunkedBody", tbuf, bs, 0, timeout, false);
3507 if (rc <= 0) {
3508 if (!*xsink) {
3509 assert(!rc);
3510 se_closed(cname, "readHTTPChunkedBody", xsink);
3511 }
3512 return 0;
3513 }
3514
3515 br += rc;
3516 buf->concat(tbuf, rc);
3517
3518 do_data_event(QORE_EVENT_HTTP_CHUNKED_DATA_READ, source, tbuf, (size_t)rc);
3519
3520 if (br >= size)
3521 break;
3522 if (size - br < bs)
3523 bs = size - br;
3524 }
3525
3526 // DEBUG
3527 //printd(5, "got chunk (" QSD " bytes): %s\n", br, buf->c_str() + buf->strlen() - size);
3528
3529 // read crlf after chunk
3530 // FIXME: bytes read are not checked if they equal CRLF
3531 br = 0;
3532 while (br < 2) {
3533 char* tbuf;
3534 rc = brecv(xsink, "readHTTPChunkedBody", tbuf, 2 - br, 0, timeout, false);
3535 if (rc <= 0) {
3536 if (!*xsink) {
3537 assert(!rc);
3538 se_closed(cname, "readHTTPChunkedBody", xsink);
3539 }
3540 return nullptr;
3541 }
3542 br += rc;
3543 }
3544
3545 do_chunked_read(QORE_EVENT_HTTP_CHUNKED_DATA_RECEIVED, size, size + 2, source);
3546
3547 if (recv_callback) {
3548 if (runDataCallback(xsink, cname, "readHTTPChunkedBody", *recv_callback, l, *buf, true))
3549 return nullptr;
3550 buf->clear();
3551 }
3552 }
3553
3554 // read footers or nothing
3555 QoreStringNodeHolder hdr(readHTTPData(xsink, "readHTTPChunkedBody", timeout, rc, true));
3556 if (*xsink)
3557 return nullptr;
3558
3559 //printd(5, "chunked body encoding: %s\n", buf->getEncoding()->getCode());
3560 ReferenceHolder<QoreHashNode> h(new QoreHashNode(autoTypeInfo), xsink);
3561 if (!recv_callback) {
3562 h->setKeyValue("body", buf.release(), xsink);
3563 }
3564
3566
3567 if (hdr) {
3568 if (hdr->strlen() >= 2 && hdr->strlen() <= 4)
3569 return recv_callback ? 0 : h.release();
3570
3571 if (recv_callback) {
3572 info = new QoreHashNode(autoTypeInfo);
3573 }
3574 convertHeaderToHash(*h, (char*)hdr->c_str(), 0, *info, nullptr, "response-headers-raw");
3575 do_read_http_header(QORE_EVENT_HTTP_FOOTERS_RECEIVED, *h, source);
3576 }
3577
3578 if (recv_callback) {
3579 runHeaderCallback(xsink, cname, "readHTTPChunkedBody", *recv_callback, l, h->empty() ? nullptr : *h,
3580 info.release(), false, obj);
3581 return 0;
3582 }
3583
3584 return h.release();
3585 }
3586
3587 DLLLOCAL QoreHashNode* readServerSentEvent(ExceptionSink* xsink, Transform* transform, int timeout_ms);
3588
3589 DLLLOCAL static QoreHashNode* parseServerSentEvent(ExceptionSink* xsink, const QoreString& buf);
3590
3591 DLLLOCAL static void do_accept_encoding(char* t, QoreHashNode& info) {
3592 ReferenceHolder<QoreListNode> l(new QoreListNode(autoTypeInfo), 0);
3593
3594 char* a = t;
3595 bool ok = true;
3596 while (*a) {
3597 if (ok) {
3598 ok = false;
3600 while (*a && *a != ';' && *a != ',')
3601 str->concat(*(a++));
3602 str->trim();
3603 if (!str->empty())
3604 l->push(str.release(), nullptr);
3605 continue;
3606 }
3607 else if (*a == ',')
3608 ok = true;
3609
3610 ++a;
3611 }
3612
3613 if (!l->empty())
3614 info.setKeyValue("accept-encoding", l.release(), 0);
3615 }
3616
3617 DLLLOCAL bool do_accept_charset(char* t, QoreHashNode& info) {
3618 bool acceptcharset = false;
3619
3620 // see if we have "*" or utf8 or utf-8, in which case set it
3621 // otherwise set the first charset in the list
3622 char* a = t;
3623 char* div = 0;
3624 bool utf8 = false;
3625 bool ok = true;
3626 while (*a) {
3627 if (ok) {
3628 if (*a == '*') {
3629 utf8 = true;
3630 break;
3631 }
3632 ok = false;
3633 if (*a == 'u' || *a == 'U') {
3634 ++a;
3635 if (*a == 't' || *a == 'T') {
3636 ++a;
3637 if (*a == 'f' || *a == 'F') {
3638 ++a;
3639 if (*a == '-')
3640 ++a;
3641 if (*a == '8') {
3642 utf8 = true;
3643 break;
3644 }
3645 }
3646 }
3647 continue;
3648 }
3649 } else if (*a == ',') {
3650 if (!div)
3651 div = a;
3652 ok = true;
3653 } else if (*a == ';') {
3654 if (!div)
3655 div = a;
3656 }
3657
3658 ++a;
3659 }
3660 if (utf8) {
3661 info.setKeyValue("accept-charset", new QoreStringNode("utf8"), 0);
3662 acceptcharset = true;
3663 } else {
3665 if (div)
3666 ac->concat(t, div - t);
3667 else
3668 ac->concat(t);
3669 ac->trim();
3670 if (!ac->empty()) {
3671 info.setKeyValue("accept-charset", ac.release(), 0);
3672 acceptcharset = true;
3673 }
3674 }
3675
3676 return acceptcharset;
3677 }
3678
3679 // returns true if the connection should be closed, false if not
3680 DLLLOCAL bool convertHeaderToHash(QoreHashNode* h, char* p, int flags = 0, QoreHashNode* info = nullptr,
3681 bool* chunked = nullptr, const char* headers_raw_key = "headers-raw") {
3682 bool close = !(flags & CHF_HTTP11);
3683 // socket encoding
3684 const char* senc = nullptr;
3685 // accept-charset
3686 bool acceptcharset = false;
3687
3688 QoreHashNode* raw_hdr = nullptr;
3689 if (info) {
3690 info->setKeyValue(headers_raw_key, raw_hdr = new QoreHashNode(autoTypeInfo), nullptr);
3691 }
3692
3693 // raw key for setting raw headers
3694 std::string raw_key;
3695
3696 while (*p) {
3697 char* buf = p;
3698
3699 if ((p = strstr(buf, "\r\n"))) {
3700 *p = '\0';
3701 p += 2;
3702 } else if ((p = strchr(buf, '\n'))) {
3703 *p = '\0';
3704 p++;
3705 } else if ((p = strchr(buf, '\r'))) {
3706 *p = '\0';
3707 p++;
3708 } else
3709 break;
3710 char* t = strchr(buf, ':');
3711 if (!t)
3712 break;
3713 *t = '\0';
3714 t++;
3715 while (t && qore_isblank(*t))
3716 t++;
3717 if (raw_hdr) {
3718 raw_key = buf;
3719 }
3720 strtolower(buf);
3721 //printd(5, "setting %s = '%s'\n", buf, t);
3722
3723 ReferenceHolder<> val(new QoreStringNode(t), nullptr);
3724
3725 if (flags & CHF_PROCESS) {
3726 if (!strcmp(buf, "connection")) {
3727 if (flags & CHF_HTTP11) {
3728 if (strcasestr(t, "close"))
3729 close = true;
3730 } else {
3731 if (strcasestr(t, "keep-alive"))
3732 close = false;
3733 }
3734 } else if (!strcmp(buf, "content-type")) {
3735 char* a = strcasestr(t, "charset=");
3736 if (a) {
3737 // find end
3738 char* e = strchr(a + 8, ';');
3739
3740 QoreString cs;
3741 if (e)
3742 cs.concat(a + 8, e - a - 8);
3743 else
3744 cs.concat(a + 8);
3745 cs.trim();
3746 senc = cs.c_str();
3747 //printd(5, "got encoding '%s' from request\n", senc);
3748 enc = QEM.findCreate(senc);
3749
3750 if (info) {
3751 size_t len = cs.size();
3752 info->setKeyValue("charset", new QoreStringNode(cs.giveBuffer(), len, len + 1, QCS_DEFAULT), nullptr);
3753 }
3754
3755 if (info) {
3757 // remove any whitespace and ';' before charset=
3758 if (a != t) {
3759 do {
3760 --a;
3761 } while (a > t && (*a == ' ' || *a == ';'));
3762 }
3763
3764 if (a == t) {
3765 if (e)
3766 ct->concat(e + 1);
3767 } else {
3768 ct->concat(t, a - t + 1);
3769 if (e)
3770 ct->concat(e);
3771 }
3772 ct->trim();
3773 if (!ct->empty())
3774 info->setKeyValue("body-content-type", ct.release(), nullptr);
3775 }
3776 } else {
3777 enc = QEM.findCreate(assume_http_encoding.c_str());
3778 if (info) {
3779 info->setKeyValue("charset", new QoreStringNode(assume_http_encoding), nullptr);
3780 info->setKeyValue("body-content-type", val->refSelf(), nullptr);
3781 }
3782 }
3783 } else if (chunked && !strcmp(buf, "transfer-encoding") && !strcasecmp(t, "chunked")) {
3784 *chunked = true;
3785 } else if (info) {
3786 if (!strcmp(buf, "accept-charset"))
3787 acceptcharset = do_accept_charset(t, *info);
3788 else if ((flags & CHF_REQUEST) && !strcmp(buf, "accept-encoding"))
3789 do_accept_encoding(t, *info);
3790 }
3791 }
3792
3793 ReferenceHolder<> val_copy(nullptr);
3794 if (raw_hdr && val) {
3795 val_copy = val->realCopy();
3796 }
3797
3798 // see if header exists, and if so make it a list and add value to the list
3799 hash_assignment_priv ha(*h, buf);
3800 if (!(*ha).isNothing()) {
3801 QoreListNode* l;
3802 if ((*ha).getType() == NT_LIST) {
3803 l = (*ha).get<QoreListNode>();
3804 } else {
3805 l = new QoreListNode(autoTypeInfo);
3806 l->push(ha.swap(l), nullptr);
3807 }
3808 l->push(val.release(), nullptr);
3809 } else // otherwise set header normally
3810 ha.assign(val.release(), 0);
3811
3812 // set raw headers if applicable
3813 if (raw_hdr) {
3814 hash_assignment_priv ha(*raw_hdr, raw_key);
3815 if (!(*ha).isNothing()) {
3816 QoreListNode* l;
3817 if ((*ha).getType() == NT_LIST) {
3818 l = (*ha).get<QoreListNode>();
3819 } else {
3820 l = new QoreListNode(autoTypeInfo);
3821 l->push(ha.swap(l), nullptr);
3822 }
3823 l->push(val_copy.release(), nullptr);
3824 } else // otherwise set header normally
3825 ha.assign(val_copy.release(), nullptr);
3826 }
3827 }
3828
3829 if ((flags & CHF_PROCESS)) {
3830 if (!senc)
3831 enc = QEM.findCreate(assume_http_encoding.c_str());
3832 // according to RFC-2616 section 14.2, "If no Accept-Charset header is present, the default is that any character set is acceptable" so we will use utf-8
3833 if (info && !acceptcharset)
3834 info->setKeyValue("accept-charset", new QoreStringNode("utf8"), nullptr);
3835 }
3836
3837 return close;
3838 }
3839
3840 DLLLOCAL int recvix(const char* meth, int len, void* targ, int timeout_ms, ExceptionSink* xsink) {
3841 assert(xsink);
3842 if (sock == QORE_INVALID_SOCKET) {
3843 se_not_open("Socket", meth, xsink, "recvix");
3844 return QSE_NOT_OPEN;
3845 }
3846 if (in_op >= 0) {
3847 if (in_op == q_gettid()) {
3848 se_in_op("Socket", meth, xsink);
3849 return 0;
3850 }
3851 se_in_op_thread("Socket", meth, xsink);
3852 return 0;
3853 }
3854
3855 PrivateQoreSocketThroughputHelper th(this, false);
3856
3857 char* buf;
3858 ssize_t br = 0;
3859 while (true) {
3860 ssize_t rc = brecv(xsink, meth, buf, len - br, 0, timeout_ms);
3861 if (rc <= 0) {
3862 do_read_error(rc, meth, timeout_ms, xsink);
3863 return (int)rc;
3864 }
3865
3866 memcpy(targ, buf, rc);
3867
3868 br += rc;
3869 if (br >= len)
3870 break;
3871 }
3872
3873 th.finalize(br);
3874 do_data_event(QORE_EVENT_SOCKET_DATA_READ, QORE_SOURCE_SOCKET, targ, br);
3875 return (int)br;
3876 }
3877
3878 DLLLOCAL void clearWarningQueue(ExceptionSink* xsink) {
3879 if (warn_queue) {
3880 if (warn_callback_arg) {
3881 warn_callback_arg.discard(xsink);
3882 warn_callback_arg = QoreValue();
3883 }
3884 warn_queue->deref(xsink);
3885 warn_queue = nullptr;
3886 tl_warning_us = 0;
3887 tp_warning_bs = 0.0;
3888 tp_us_min = 0;
3889 }
3890 }
3891
3892 DLLLOCAL void setWarningQueue(ExceptionSink* xsink, int64 warning_ms, int64 warning_bs, Queue* wq, QoreValue arg,
3893 int64 min_ms = 1000) {
3894 ReferenceHolder<Queue> qholder(wq, xsink);
3895 ValueHolder holder(arg, xsink);
3896 if (warning_ms <= 0 && warning_bs <= 0) {
3897 xsink->raiseException("SOCKET-SETWARNINGQUEUE-ERROR", "Socket::setWarningQueue() at least one of warning "
3898 "ms argument: " QLLD " and warning B/s argument: " QLLD " must be greater than zero; to clear, call "\
3899 "Socket::clearWarningQueue() with no arguments", warning_ms, warning_bs);
3900 return;
3901 }
3902
3903 if (warning_ms < 0)
3904 warning_ms = 0;
3905 if (warning_bs < 0)
3906 warning_bs = 0;
3907
3908 if (warn_queue) {
3909 warn_queue->deref(xsink);
3910 warn_callback_arg.discard(xsink);
3911 }
3912
3913 warn_queue = qholder.release();
3914 warn_callback_arg = holder.release();
3915 tl_warning_us = (int64)warning_ms * 1000;
3916 tp_warning_bs = warning_bs;
3917 tp_us_min = min_ms * 1000;
3918 }
3919
3920 DLLLOCAL void getUsageInfo(QoreHashNode& h, qore_socket_private& s) const {
3921 if (warn_queue) {
3922 h.setKeyValue("arg", warn_callback_arg.refSelf(), 0);
3923 h.setKeyValue("timeout", tl_warning_us, 0);
3924 h.setKeyValue("min_throughput", (int64)tp_warning_bs, 0);
3925 h.setKeyValue("min_throughput_us", (int64)tp_us_min, 0);
3926 }
3927
3928 h.setKeyValue("bytes_sent", tp_bytes_sent + s.tp_bytes_sent, 0);
3929 h.setKeyValue("bytes_recv", tp_bytes_recv + s.tp_bytes_sent, 0);
3930 h.setKeyValue("us_sent", tp_us_sent + s.tp_us_sent, 0);
3931 h.setKeyValue("us_recv", tp_us_recv + s.tp_us_recv, 0);
3932 }
3933
3934 DLLLOCAL void getUsageInfo(QoreHashNode& h) const {
3935 if (warn_queue) {
3936 h.setKeyValue("arg", warn_callback_arg.refSelf(), 0);
3937 h.setKeyValue("timeout", tl_warning_us, 0);
3938 h.setKeyValue("min_throughput", (int64)tp_warning_bs, 0);
3939 h.setKeyValue("min_throughput_us", (int64)tp_us_min, 0);
3940 }
3941
3942 h.setKeyValue("bytes_sent", tp_bytes_sent, 0);
3943 h.setKeyValue("bytes_recv", tp_bytes_recv, 0);
3944 h.setKeyValue("us_sent", tp_us_sent, 0);
3945 h.setKeyValue("us_recv", tp_us_recv, 0);
3946 }
3947
3948 DLLLOCAL QoreHashNode* getUsageInfo() const {
3949 QoreHashNode* h = new QoreHashNode(autoTypeInfo);
3950 getUsageInfo(*h);
3951 return h;
3952 }
3953
3954 DLLLOCAL void clearStats() {
3955 tp_bytes_sent = 0;
3956 tp_bytes_recv = 0;
3957 tp_us_sent = 0;
3958 tp_us_recv = 0;
3959 }
3960
3961 DLLLOCAL void doTimeoutWarning(const char* op, int64 dt) {
3962 assert(warn_queue);
3963 assert(dt > tl_warning_us);
3964
3965 QoreHashNode* h = new QoreHashNode(autoTypeInfo);
3966
3967 h->setKeyValue("type", new QoreStringNode("SOCKET-OPERATION-WARNING"), 0);
3968 h->setKeyValue("operation", new QoreStringNode(op), 0);
3969 h->setKeyValue("us", dt, 0);
3970 h->setKeyValue("timeout", tl_warning_us, 0);
3971 if (warn_callback_arg)
3972 h->setKeyValue("arg", warn_callback_arg.refSelf(), 0);
3973
3974 warn_queue->pushAndTakeRef(h);
3975 }
3976
3977 DLLLOCAL void doThroughputWarning(bool send, int64 bytes, int64 dt, double bs) {
3978 assert(warn_queue);
3979 assert(bs < tp_warning_bs);
3980
3981 QoreHashNode* h = new QoreHashNode(autoTypeInfo);
3982
3983 h->setKeyValue("type", new QoreStringNode("SOCKET-THROUGHPUT-WARNING"), 0);
3984 h->setKeyValue("dir", new QoreStringNode(send ? "send" : "recv"), 0);
3985 h->setKeyValue("bytes", bytes, 0);
3986 h->setKeyValue("us", dt, 0);
3987 h->setKeyValue("bytes_sec", bs, 0);
3988 h->setKeyValue("threshold", (int64)tp_warning_bs, 0);
3989 if (warn_callback_arg)
3990 h->setKeyValue("arg", warn_callback_arg.refSelf(), 0);
3991
3992 warn_queue->pushAndTakeRef(h);
3993 }
3994
3995 DLLLOCAL bool pendingHttpChunkedBody() const {
3996 return http_exp_chunked_body && sock != QORE_INVALID_SOCKET;
3997 }
3998
3999 DLLLOCAL void setSslVerifyMode(int mode) {
4000 //printd(5, "qore_socket_private::setSslVerifyMode() this: %p mode: %d\n", this, mode);
4001 ssl_verify_mode = mode;
4002 if (ssl)
4003 ssl->setVerifyMode(ssl_verify_mode, ssl_accept_all_certs, client_target);
4004 }
4005
4006 DLLLOCAL void acceptAllCertificates(bool accept_all = true) {
4007 ssl_accept_all_certs = accept_all;
4008 if (ssl)
4009 ssl->setVerifyMode(ssl_verify_mode, ssl_accept_all_certs, client_target);
4010 }
4011
4012 DLLLOCAL void setSslErrorString(QoreStringNode* err_str) {
4013 if (ssl_err_str) {
4014 ssl_err_str->concat("; ");
4015 ssl_err_str->concat(err_str);
4016 err_str->deref();
4017 } else {
4018 ssl_err_str = err_str;
4019 }
4020 }
4021
4022 DLLLOCAL static void getUsageInfo(const QoreSocket& sock, QoreHashNode& h, const QoreSocket& s) {
4023 sock.priv->getUsageInfo(h, *s.priv);
4024 }
4025
4026 DLLLOCAL static qore_socket_private* get(QoreSocket& sock) {
4027 return sock.priv;
4028 }
4029
4030 DLLLOCAL static const qore_socket_private* get(const QoreSocket& sock) {
4031 return sock.priv;
4032 }
4033
4034 DLLLOCAL static void captureRemoteCert(X509_STORE_CTX* x509_ctx);
4035
4036 DLLLOCAL static QoreListNode* poll(const QoreListNode* poll_list, int timeout_ms, ExceptionSink* xsink);
4037};
4038
4039#endif
DLLEXPORT const QoreEncoding * QCS_DEFAULT
the default encoding for the Qore library
DLLEXPORT QoreEncodingManager QEM
the QoreEncodingManager object
DLLEXPORT QoreStringNode * q_strerror(int errnum)
returns the error string as a QoreStringNode
#define QORE_MIN(a, b)
macro to return the minimum of 2 numbers
Definition QoreLib.h:616
static void strtolower(char *str)
convert a string to lower-case in place
Definition QoreLib.h:268
The base class for all value and parse types in Qore expression trees.
Definition AbstractQoreNode.h:57
DLLEXPORT AbstractQoreNode * refSelf() const
returns "this" with an incremented reference count
virtual DLLEXPORT AbstractQoreNode * realCopy() const =0
returns a copy of the object; the caller owns the reference count
DLLEXPORT void deref(ExceptionSink *xsink)
decrements the reference count and calls derefImpl() if there_can_be_only_one is false,...
provides a safe and exception-safe way to release and re-acquire locks in Qore, only to be used on th...
Definition QoreThreadLock.h:190
holds arbitrary binary data
Definition BinaryNode.h:41
DLLEXPORT void append(const void *nptr, size_t size)
resizes the object and appends a copy of the data passed to the object
DLLEXPORT size_t size() const
returns the number of bytes in the object
DLLEXPORT bool empty() const
returns true if empty
DLLEXPORT const void * getPtr() const
returns the pointer to the data
constant iterator class for QoreHashNode, to be only created on the stack
Definition QoreHashNode.h:606
For use on the stack only: iterates through elements of a const QoreListNode.
Definition QoreListNode.h:575
container for holding Qore-language exception information and also for registering a "thread_exit" ca...
Definition ExceptionSink.h:50
DLLEXPORT int appendLastDescription(const char *fmt,...)
appends a formatted string to the top exception description if the desc value is a string
DLLEXPORT AbstractQoreNode * raiseException(const char *err, const char *fmt,...)
appends a Qore-language exception to the list
DLLEXPORT AbstractQoreNode * raiseErrnoException(const char *err, int en, const char *fmt,...)
appends a Qore-language exception to the list and appends the result of strerror(errno) to the descri...
DLLEXPORT AbstractQoreNode * raiseExceptionArg(const char *err, QoreValue arg, const char *fmt,...)
appends a Qore-language exception to the list, and sets the 'arg' member (this object takes over the ...
Interface for private data of input streams.
Definition InputStream.h:44
virtual int64 read(void *ptr, int64 limit, ExceptionSink *xsink)=0
Reads up to `limit` bytes from the input stream.
Interface for private data of output streams.
Definition OutputStream.h:44
virtual void write(const void *ptr, int64 count, ExceptionSink *xsink)=0
Writes bytes to the output stream.
provides an interface to getaddrinfo
Definition QoreNet.h:132
DLLLOCAL hashdecl addrinfo * getAddrInfo() const
returns the hashdecl addrinfo * being managed (may by 0)
Definition QoreNet.h:159
static DLLEXPORT QoreStringNode * getAddressDesc(int address_family, const char *addr)
returns a descriptive string for the address family and an address string (ie AF_INET6,...
DLLEXPORT int getInfo(ExceptionSink *xsink, const char *node, const char *service, int family=Q_AF_UNSPEC, int flags=0, int socktype=Q_SOCK_STREAM, int protocol=0)
get address info with the given parameters, if any errors occur, a Qore-language exception is thrown
static DLLEXPORT const char * getFamilyName(int address_family)
returns the name of the address family as a string (ie AF_INET = "ipv4", etc)
defines string encoding functions in Qore
Definition QoreEncoding.h:83
static DLLEXPORT const QoreEncoding * findCreate(const char *name)
finds an encoding if it exists (also looks up against alias names) and creates a new one if it doesn'...
This is the hash or associative list container type in Qore, dynamically allocated only,...
Definition QoreHashNode.h:51
DLLEXPORT int setKeyValue(const char *key, QoreValue value, ExceptionSink *xsink)
sets the value of "key" to "value"
DLLEXPORT QoreHashNode * hashRefSelf() const
returns "this" with an incremented reference count
DLLEXPORT size_t size() const
returns the number of members in the hash, executes in constant time
DLLEXPORT bool empty() const
returns true if the hash has no members, false if not
DLLEXPORT QoreHashNode * copy() const
performs a copy of the hash and returns the new hash
This is the list container type in Qore, dynamically allocated only, reference counted.
Definition QoreListNode.h:52
DLLEXPORT int push(QoreValue val, ExceptionSink *xsink)
adds a value to the list
Qore's arbitrary-precision number value type, dynamically-allocated only, reference counted.
Definition QoreNumberNode.h:51
the implementation of Qore's object data type, reference counted, dynamically-allocated only
Definition QoreObject.h:61
DLLEXPORT AbstractPrivateData * getReferencedPrivateData(qore_classid_t key, ExceptionSink *xsink) const
returns the private data corresponding to the class ID passed with an incremented reference count,...
DLLEXPORT void setValue(const char *key, QoreValue val, ExceptionSink *xsink)
sets the value of the given member to the given value
represents an X509 certificate, reference-counted, dynamically-allocated only
Definition QoreSSLCertificate.h:42
provides access to a private key data structure for SSL connections
Definition QoreSSLPrivateKey.h:40
DLLEXPORT double getAsFloat() const
returns the value as a float
DLLLOCAL detail::QoreValueCastHelper< T >::Result get()
returns the value as the given type
Definition QoreValue.h:214
DLLEXPORT qore_type_t getType() const
returns the type of value contained
DLLEXPORT const char * getTypeName() const
returns a string type description of the value contained (ex: "nothing" for a null AbstractQoreNode p...
DLLEXPORT bool getAsBool() const
returns the value as a bool
DLLEXPORT int64 getAsBigInt() const
returns the value as an int
DLLEXPORT void clear()
unconditionally set the QoreValue to QoreNothingNode (does not dereference any possible contained Abs...
provides access to sockets using Qore data structures
Definition QoreSocket.h:129
Qore's string type supported by the QoreEncoding class.
Definition QoreString.h:93
DLLEXPORT void set(const char *str, const QoreEncoding *new_qorecharset=QCS_DEFAULT)
copies the c-string passed and sets the value of the string and its encoding
DLLEXPORT const char * c_str() const
returns the string's buffer; this data should not be changed
DLLEXPORT size_t strlen() const
returns number of bytes in the string (not including the null pointer)
DLLEXPORT void clear()
reset string to zero length; memory is not deallocated; string encoding does not change
DLLEXPORT char * giveBuffer()
returns the character buffer and leaves the QoreString empty, the caller owns the memory returned (mu...
DLLEXPORT void concat(const QoreString *str, ExceptionSink *xsink)
concatenates a string and converts encodings if necessary
DLLEXPORT int sprintf(const char *fmt,...)
this will concatentate a formatted string to the existing string according to the format string and t...
DLLEXPORT size_t size() const
returns number of bytes in the string (not including the null pointer)
DLLEXPORT void trim(const char *chars=0)
remove leading and trailing whitespace or other characters
DLLEXPORT bool empty() const
returns true if the string is empty, false if not
Qore's string value type, reference counted, dynamically-allocated only.
Definition QoreStringNode.h:50
provides a mutually-exclusive thread lock
Definition QoreThreadLock.h:49
a templated class to manage a reference count of an object that can throw a Qore-language exception w...
Definition ReferenceHolder.h:52
base class for resolved call references
Definition CallReferenceNode.h:115
virtual DLLLOCAL QoreValue execValue(const QoreListNode *args, ExceptionSink *xsink) const =0
pure virtual function for executing the function reference
manages a reference count of a pointer to a class that takes a simple "deref()" call with no argument...
Definition ReferenceHolder.h:127
a helper class for getting socket origination information
Definition QoreSocket.h:76
Interface for private data of transformations.
Definition Transform.h:40
holds an object and dereferences it in the destructor
Definition QoreValue.h:487
unsigned qore_classid_t
used for the unique class ID for QoreClass objects
Definition common.h:85
long long int64
64bit integer type, cannot use int64_t here since it breaks the API on some 64-bit systems due to equ...
Definition common.h:266
const qore_type_t NT_BOOLEAN
type value for bools (QoreValue only)
Definition node_types.h:47
const qore_type_t NT_NUMBER
type value for QoreNumberNode
Definition node_types.h:53
const qore_type_t NT_BINARY
type value for BinaryNode
Definition node_types.h:49
const qore_type_t NT_LIST
type value for QoreListNode
Definition node_types.h:50
const qore_type_t NT_NULL
type value for QoreNullNode
Definition node_types.h:48
const qore_type_t NT_INT
type value for integers (QoreValue only)
Definition node_types.h:43
const qore_type_t NT_STRING
type value for QoreStringNode
Definition node_types.h:45
const qore_type_t NT_FLOAT
type value for floating-point values (QoreValue only)
Definition node_types.h:44
const qore_type_t NT_HASH
type value for QoreHashNode
Definition node_types.h:51
const qore_type_t NT_NOTHING
type value for QoreNothingNode
Definition node_types.h:42
DLLEXPORT int q_gettid() noexcept
returns the current TID number
The main value class in Qore, designed to be passed by value.
Definition QoreValue.h:279
DLLEXPORT void discard(ExceptionSink *xsink)
dereferences any contained AbstractQoreNode pointer and sets to 0; does not modify other values
DLLEXPORT QoreValue refSelf() const
references the contained value if type == QV_Node, returns itself