Qore Programming Language 2.1.2
Loading...
Searching...
No Matches
qore_socket_private.h
1/* -*- mode: c++; indent-tabs-mode: nil -*- */
2/*
3 qore_socket_private.h
4
5 Qore Programming Language
6
7 Copyright (C) 2003 - 2024 Qore Technologies, s.r.o.
8
9 Permission is hereby granted, free of charge, to any person obtaining a
10 copy of this software and associated documentation files (the "Software"),
11 to deal in the Software without restriction, including without limitation
12 the rights to use, copy, modify, merge, publish, distribute, sublicense,
13 and/or sell copies of the Software, and to permit persons to whom the
14 Software is furnished to do so, subject to the following conditions:
15
16 The above copyright notice and this permission notice shall be included in
17 all copies or substantial portions of the Software.
18
19 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
24 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25 DEALINGS IN THE SOFTWARE.
26
27 Note that the Qore library is released under a choice of three open-source
28 licenses: MIT (as above), LGPL 2+, or GPL 2+; see README-LICENSE for more
29 information.
30*/
31
32#ifndef _QORE_QORE_SOCKET_PRIVATE_H
33#define _QORE_QORE_SOCKET_PRIVATE_H
34
35#include "qore/AbstractPollState.h"
36#include "qore/QoreSocket.h"
37#include "qore/InputStream.h"
38#include "qore/OutputStream.h"
39
40#include "qore/intern/SSLSocketHelper.h"
41#include "qore/intern/QC_Queue.h"
42
43#include <cctype>
44#include <cctype>
45#include <cerrno>
46#include <cstdlib>
47#include <cstring>
48#include <strings.h>
49
50#include <openssl/ssl.h>
51#include <openssl/err.h>
52
53#if defined HAVE_POLL
54#include <poll.h>
55#elif defined HAVE_SYS_SELECT_H
56#include <sys/select.h>
57#elif (defined _WIN32 || defined __WIN32__) && ! defined __CYGWIN__
58#define HAVE_SELECT 1
59#else
60#error no async socket I/O APIs available
61#endif
62
63#ifndef DEFAULT_SOCKET_BUFSIZE
64#define DEFAULT_SOCKET_BUFSIZE (64 * 1024)
65#endif
66
67#ifndef QORE_MAX_HEADER_SIZE
68#define QORE_MAX_HEADER_SIZE 16384
69#endif
70
71#define CHF_HTTP11 (1 << 0)
72#define CHF_PROCESS (1 << 1)
73#define CHF_REQUEST (1 << 2)
74
75#ifndef DEFAULT_SOCKET_MIN_THRESHOLD_BYTES
76#define DEFAULT_SOCKET_MIN_THRESHOLD_BYTES 1024
77#endif
78
79static constexpr int SOCK_POLLIN = (1 << 0);
80static constexpr int SOCK_POLLOUT = (1 << 1);
81static constexpr int SOCK_POLLERR = (1 << 2);
82
83DLLLOCAL void concat_target(QoreString& str, const struct sockaddr *addr, const char* type = "target");
84DLLLOCAL int do_read_error(ssize_t rc, const char* method_name, int timeout_ms, ExceptionSink* xsink);
85DLLLOCAL int sock_get_raw_error();
86DLLLOCAL int sock_get_error();
87DLLLOCAL void qore_socket_error(ExceptionSink* xsink, const char* err, const char* cdesc, const char* mname = nullptr,
88 const char* host = nullptr, const char* svc = nullptr, const struct sockaddr *addr = nullptr);
89DLLLOCAL void qore_socket_error_intern(int rc, ExceptionSink* xsink, const char* err, const char* cdesc,
90 const char* mname = nullptr, const char* host = nullptr, const char* svc = nullptr,
91 const struct sockaddr* addr = nullptr);
92DLLLOCAL void se_in_op(const char* cname, const char* meth, ExceptionSink* xsink);
93DLLLOCAL void se_in_op_thread(const char* cname, const char* meth, ExceptionSink* xsink);
94DLLLOCAL void se_not_open(const char* cname, const char* meth, ExceptionSink* xsink, const char* extra = nullptr);
95DLLLOCAL void se_timeout(const char* cname, const char* meth, int timeout_ms, ExceptionSink* xsink,
96 const char* extra = nullptr);
97DLLLOCAL void se_closed(const char* cname, const char* mname, ExceptionSink* xsink);
98
99class Transform;
100
101#ifdef _Q_WINDOWS
102#define GETSOCKOPT_ARG_4 char*
103#define SETSOCKOPT_ARG_4 const char*
104#define SHUTDOWN_ARG SD_BOTH
105#define QORE_INVALID_SOCKET ((int)INVALID_SOCKET)
106#define QORE_SOCKET_ERROR SOCKET_ERROR
107DLLLOCAL int check_windows_rc(int rc);
108DLLLOCAL int windows_set_errno();
109
110#ifndef ECONNRESET
111#define ECONNRESET WSAECONNRESET
112#endif
113
114#else
115// UNIX/Cygwin
116#define GETSOCKOPT_ARG_4 void*
117#define SETSOCKOPT_ARG_4 void*
118#define SHUTDOWN_ARG SHUT_RDWR
119#define QORE_INVALID_SOCKET -1
120#define QORE_SOCKET_ERROR -1
121#endif
122
123template <typename T>
124class PrivateDataListHolder {
125public:
126 DLLLOCAL PrivateDataListHolder(ExceptionSink* xsink) : xsink(xsink) {
127 }
128
129 DLLLOCAL ~PrivateDataListHolder() {
130 for (auto& i : pd_vec)
131 i->deref(xsink);
132 }
133
134 DLLLOCAL T* add(const QoreObject* o, qore_classid_t cid) {
135 T* pd = static_cast<T*>(o->getReferencedPrivateData(cid, xsink));
136 if (!pd)
137 return nullptr;
138 pd_vec.push_back(pd);
139 return pd;
140 }
141
142private:
143 typedef std::vector<T*> pd_vec_t;
144 pd_vec_t pd_vec;
145 ExceptionSink* xsink;
146};
147
148hashdecl qore_socketsource_private {
149 QoreStringNode* address;
150 QoreStringNode* hostname;
151
152 DLLLOCAL qore_socketsource_private() : address(0), hostname(0) {
153 }
154
155 DLLLOCAL ~qore_socketsource_private() {
156 if (address) address->deref();
157 if (hostname) hostname->deref();
158 }
159
160 DLLLOCAL void setAddress(QoreStringNode* addr) {
161 assert(!address);
162 address = addr;
163 }
164
165 DLLLOCAL void setAddress(const char* addr) {
166 assert(!address);
167 address = new QoreStringNode(addr);
168 }
169
170 DLLLOCAL void setHostName(const char* host) {
171 assert(!hostname);
172 hostname = new QoreStringNode(host);
173 }
174
175 DLLLOCAL void setAll(QoreObject* o, ExceptionSink* xsink) {
176 if (address) {
177 o->setValue("source", address, xsink);
178 address = 0;
179 }
180
181 if (hostname) {
182 o->setValue("source_host", hostname, xsink);
183 hostname = 0;
184 }
185 }
186};
187
188class OptionalNonBlockingHelper {
189public:
190 qore_socket_private& sock;
191 ExceptionSink* xsink;
192 bool set;
193
194 DLLLOCAL OptionalNonBlockingHelper(qore_socket_private& s, bool n_set, ExceptionSink* xs);
195 DLLLOCAL ~OptionalNonBlockingHelper();
196};
197
198class PrivateQoreSocketTimeoutBase {
199public:
200 DLLLOCAL PrivateQoreSocketTimeoutBase(qore_socket_private* s) : sock(s), start(sock ? q_clock_getmicros() : 0) {
201 }
202
203protected:
204 hashdecl qore_socket_private* sock;
205 int64 start;
206};
207
208class PrivateQoreSocketTimeoutHelper : public PrivateQoreSocketTimeoutBase {
209public:
210 DLLLOCAL PrivateQoreSocketTimeoutHelper(qore_socket_private* s, const char* op);
211 DLLLOCAL ~PrivateQoreSocketTimeoutHelper();
212
213protected:
214 const char* op;
215};
216
217class PrivateQoreSocketThroughputHelper : public PrivateQoreSocketTimeoutBase {
218public:
219 DLLLOCAL PrivateQoreSocketThroughputHelper(qore_socket_private* s, bool snd);
220 DLLLOCAL ~PrivateQoreSocketThroughputHelper();
221
222 DLLLOCAL void finalize(int64 bytes);
223
224protected:
225 bool send;
226};
227
228hashdecl qore_socket_private;
229
230hashdecl qore_socket_op_helper {
231protected:
232 qore_socket_private* s;
233
234public:
235 DLLLOCAL qore_socket_op_helper(qore_socket_private* sock);
236 DLLLOCAL ~qore_socket_op_helper();
237};
238
239class SSLSocketHelperHelper {
240protected:
241 qore_socket_private* s;
242 SSLSocketHelper* ssl;
243 bool context_saved = false;
244
245public:
246 DLLLOCAL SSLSocketHelperHelper(qore_socket_private* sock, bool set_thread_context = false);
247
248 DLLLOCAL ~SSLSocketHelperHelper();
249
250 DLLLOCAL void error();
251};
252
253constexpr int SCIPS_CONNECT = 0;
254constexpr int SCIPS_CHECK_CONNECT = 1;
255
256class SocketConnectInetPollState : public AbstractPollState {
257public:
258 DLLLOCAL SocketConnectInetPollState(ExceptionSink* xsink, qore_socket_private* sock, const char* host,
259 const char* service, int family = AF_UNSPEC, int type = SOCK_STREAM, int protocol = 0);
260
267 DLLLOCAL virtual int continuePoll(ExceptionSink* xsink);
268
269private:
270 QoreAddrInfo ai;
271 qore_socket_private* sock;
272 std::string host, service;
273 hashdecl addrinfo* p = nullptr;
274 int prt = -1;
275 int state = SCIPS_CONNECT;
276
277 DLLLOCAL int doConnect(ExceptionSink* xsink);
278
279 // returns 0 = connected, 1 = try again, -1 = error
280 DLLLOCAL int checkConnection(ExceptionSink* xsink);
281
283 DLLLOCAL int next(ExceptionSink* xsink);
284
286 DLLLOCAL int nextIntern(ExceptionSink* xsink);
287};
288
289#ifndef _Q_WINDOWS
290class SocketConnectUnixPollState : public AbstractPollState {
291public:
292 DLLLOCAL SocketConnectUnixPollState(ExceptionSink* xsink, qore_socket_private* sock, const char* name,
293 int type = SOCK_STREAM, int protocol = 0);
294
301 DLLLOCAL virtual int continuePoll(ExceptionSink* xsink);
302
303private:
304 qore_socket_private* sock;
305 std::string name;
306 hashdecl sockaddr_un addr;
307 int state = SCIPS_CONNECT;
308
309 DLLLOCAL int doConnect(ExceptionSink* xsink);
310
311 // returns 0 = connected, 1 = try again, -1 = error
312 DLLLOCAL int checkConnection(ExceptionSink* xsink);
313};
314#endif
315
316class SocketConnectSslPollState : public AbstractPollState {
317public:
318 DLLLOCAL SocketConnectSslPollState(ExceptionSink* xsink, qore_socket_private* sock,
319 QoreSSLCertificate* cert = nullptr, QoreSSLPrivateKey* pkey = nullptr);
320
327 DLLLOCAL virtual int continuePoll(ExceptionSink* xsink);
328
329private:
330 qore_socket_private* sock;
331
332 // returns 0 = connected, 1 = try again, -1 = error
333 DLLLOCAL int checkConnection(ExceptionSink* xsink);
334};
335
336class SocketAcceptPollState : public AbstractPollState {
337public:
338 DLLLOCAL SocketAcceptPollState(ExceptionSink* xsink, qore_socket_private* sock);
339
346 DLLLOCAL virtual int continuePoll(ExceptionSink* xsink);
347
349 DLLLOCAL int getDescriptor() const {
350 return descriptor;
351 }
352
353private:
354 qore_socket_private* sock;
355 int descriptor = -1;
356};
357
358class SocketAcceptSslPollState : public AbstractPollState {
359public:
360 DLLLOCAL SocketAcceptSslPollState(ExceptionSink* xsink, qore_socket_private* sock,
361 QoreSSLCertificate* cert = nullptr, QoreSSLPrivateKey* pkey = nullptr);
362
369 DLLLOCAL virtual int continuePoll(ExceptionSink* xsink);
370
371private:
372 qore_socket_private* sock;
373};
374
375class SocketSendPollState : public AbstractPollState {
376public:
377 DLLLOCAL SocketSendPollState(ExceptionSink* xsink, qore_socket_private* sock, const char* data, size_t size);
378
385 DLLLOCAL virtual int continuePoll(ExceptionSink* xsink);
386
388 DLLLOCAL size_t getBytesSent() const {
389 return sent;
390 }
391
392private:
393 qore_socket_private* sock;
394 const char* data;
395 size_t size;
396 size_t sent = 0;
397};
398
399class SocketRecvPollState : public AbstractPollState {
400public:
401 DLLLOCAL SocketRecvPollState(ExceptionSink* xsink, qore_socket_private* sock, size_t size);
402
409 DLLLOCAL virtual int continuePoll(ExceptionSink* xsink);
410
412 DLLLOCAL virtual QoreValue takeOutput() {
413 QoreValue rv = bin.release();
414 bin = nullptr;
415 return rv;
416 }
417
419 DLLLOCAL size_t getBytesReceived() const {
420 return received;
421 }
422
423private:
424 qore_socket_private* sock;
426 size_t size;
427 size_t received = 0;
428};
429
430class SocketRecvPacketPollState : public AbstractPollState {
431public:
432 DLLLOCAL SocketRecvPacketPollState(ExceptionSink* xsink, qore_socket_private* sock);
433
440 DLLLOCAL virtual int continuePoll(ExceptionSink* xsink);
441
443 DLLLOCAL virtual QoreValue takeOutput() {
444 QoreValue rv = bin.release();
445 bin = nullptr;
446 return rv;
447 }
448
450 DLLLOCAL size_t getBytesReceived() const {
451 return bin ? bin->size() : 0;
452 }
453
454private:
455 qore_socket_private* sock;
457 bool io = false;
458};
459
460class SocketRecvUntilBytesPollState : public AbstractPollState {
461public:
462 DLLLOCAL SocketRecvUntilBytesPollState(ExceptionSink* xsink, qore_socket_private* sock, const char* bytes,
463 size_t size);
464
471 DLLLOCAL virtual int continuePoll(ExceptionSink* xsink);
472
474 DLLLOCAL virtual QoreValue takeOutput() {
475 size_t len = bin->size();
476 BinaryNode* rv = new BinaryNode(bin->giveBuffer(), len);
477 bin = nullptr;
478 return rv;
479 }
480
482 DLLLOCAL size_t getBytesReceived() const {
483 return bin ? bin->size() : 0;
484 }
485
486private:
487 qore_socket_private* sock;
488 // we are using QoreStringNode as it has a much better append / concat implementation than BinaryNode
490 const char* bytes;
491 size_t size;
492 size_t matched = 0;
493
494 DLLLOCAL int doRecv(ExceptionSink* xsink);
495};
496
497
498hashdecl qore_socket_private {
499 friend class PrivateQoreSocketTimeoutHelper;
500 friend class PrivateQoreSocketThroughputHelper;
501 friend class SocketConnectInetPollState;
502
503 // for client certificate capture
504 static thread_local qore_socket_private* current_socket;
505
506 int sock, sfamily, port, stype, sprot;
507
508 // issue #3558: connection sequence to show when a connection has been reestablished
509 int64 connection_id = 0;
510
511 const QoreEncoding* enc;
512
513 std::string socketname;
514 // issue #3053: client target for SNI
515 std::string client_target;
516 SSLSocketHelper* ssl = nullptr;
517 Queue* event_queue = nullptr,
518 * warn_queue = nullptr;
519
520 // issue #3633: HTTP encoding to assume
521 std::string assume_http_encoding = "ISO-8859-1";
522
523 // socket buffer for buffered reads
524 char rbuf[DEFAULT_SOCKET_BUFSIZE];
525
526 // content types that imply UTF-8 character encoding
527 typedef std::set<std::string> strset_t;
528 strset_t utf8_content_type_set = {
529 "application/ecmascript",
530 "application/json",
531 "application/x-javascript",
532 "application/javascript",
533 "text/javascript", // <- this is the correct MIME type for JavaScript
534 "application/ld+json",
535 "application/yaml", // <- this is the correct MIME type for YAML
536 "application/x-yaml",
537 "text/yaml",
538 };
539
540 // current buffer size
541 size_t buflen = 0,
542 bufoffset = 0;
543
544 int64 tl_warning_us = 0; // timeout threshold for network action warning in microseconds
545 double tp_warning_bs = 0; // throughput warning threshold in B/s
546 int64 tp_bytes_sent = 0, // throughput: bytes sent
547 tp_bytes_recv = 0, // throughput: bytes received
548 tp_us_sent = 0, // throughput: time sending
549 tp_us_recv = 0, // throughput: time receiving
550 tp_us_min = 0 // throughput: minimum time for transfer to be considered
551 ;
552
554 QoreValue warn_callback_arg;
556 QoreValue event_arg;
557 bool del = false,
558 http_exp_chunked_body = false,
559 ssl_accept_all_certs = false,
560 ssl_capture_remote_cert = false,
561 event_data = false,
562 sse_got_cr = false;
563 int in_op = -1,
564 ssl_verify_mode = SSL_VERIFY_NONE;
565
566 // issue #3512: the remote certificate captured
567 QoreObject* remote_cert = nullptr;
568
569 // issue #3818: verbose certificate verification error info
570 QoreStringNode* ssl_err_str = nullptr;
571
572 DLLLOCAL qore_socket_private(int n_sock = QORE_INVALID_SOCKET, int n_sfamily = AF_UNSPEC,
573 int n_stype = SOCK_STREAM, int n_prot = 0, const QoreEncoding* n_enc = QCS_DEFAULT) :
574 sock(n_sock), sfamily(n_sfamily), port(-1), stype(n_stype), sprot(n_prot), enc(n_enc) {
575 }
576
577 DLLLOCAL ~qore_socket_private() {
578 close_internal();
579
580 // must be dereferenced and removed before deleting
581 assert(!event_queue);
582 assert(!warn_queue);
583 }
584
585 DLLLOCAL bool isOpen() {
586 return sock != QORE_INVALID_SOCKET;
587 }
588
589 DLLLOCAL int close() {
590 return close_internal();
591 }
592
593 DLLLOCAL int close_and_reset() {
594 assert(sock != QORE_INVALID_SOCKET);
595 int rc;
596 while (true) {
597#ifdef _Q_WINDOWS
598 rc = ::closesocket(sock);
599#else
600 rc = ::close(sock);
601#endif
602 // try again if close was interrupted by a signal
603 if (!rc || sock_get_error() != EINTR)
604 break;
605 }
606 //printd(5, "qore_socket_private::close_and_reset(this: %p) close(%d) returned %d\n", this, sock, rc);
607 sock = QORE_INVALID_SOCKET;
608 if (buflen) {
609 buflen = 0;
610 }
611 if (bufoffset) {
612 bufoffset = 0;
613 }
614 if (del) {
615 del = false;
616 }
617 if (port != -1) {
618 port = -1;
619 }
620 if (in_op >= 0) {
621 in_op = -1;
622 }
623 if (http_exp_chunked_body) {
624 http_exp_chunked_body = false;
625 }
626 if (sse_got_cr) {
627 sse_got_cr = false;
628 }
629 sfamily = AF_UNSPEC;
630 stype = SOCK_STREAM;
631 sprot = 0;
632 // issue #3053: clear hostname for SNI
633 client_target.clear();
634 return rc;
635 }
636
637 DLLLOCAL int close_internal() {
638 //printd(5, "qore_socket_private::close_internal(this: %p) sock: %d\n", this, sock);
639 if (ssl_err_str) {
640 ssl_err_str->deref();
641 ssl_err_str = nullptr;
642 }
643 if (remote_cert) {
644 remote_cert->deref(nullptr);
645 remote_cert = nullptr;
646 }
647 if (sock >= 0) {
648 // if an SSL connection has been established, shut it down first
649 if (ssl) {
650 ssl->shutdown();
651 ssl->deref();
652 ssl = nullptr;
653 }
654
655 if (!socketname.empty()) {
656 if (del)
657 unlink(socketname.c_str());
658 socketname.clear();
659 }
660 do_close_event();
661 // issue #3558: increment the connection sequence here. so the connection sequence is different as soon as
662 // it's closed
663 ++connection_id;
664
665 return close_and_reset();
666 } else {
667 return 0;
668 }
669 }
670
671 DLLLOCAL void setAssumedEncoding(const char* str) {
672 assume_http_encoding = str;
673 }
674
675 DLLLOCAL const char* getAssumedEncoding() const {
676 return assume_http_encoding.c_str();
677 }
678
679 DLLLOCAL int getSendTimeout() const {
680 hashdecl timeval tv;
681
682#if defined(HPUX) && defined(__ia64) && defined(__LP64__)
683 // on HPUX 64-bit the OS defines socklen_t to be 8 bytes
684 // but the library expects a 32-bit value
685 int size = sizeof(hashdecl timeval);
686#else
687 socklen_t size = sizeof(hashdecl timeval);
688#endif
689
690 if (getsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (GETSOCKOPT_ARG_4)&tv, (socklen_t *)&size))
691 return -1;
692
693 return tv.tv_sec * 1000 + tv.tv_usec / 1000;
694 }
695
696 DLLLOCAL int getRecvTimeout() const {
697 hashdecl timeval tv;
698
699#if defined(HPUX) && defined(__ia64) && defined(__LP64__)
700 // on HPUX 64-bit the OS defines socklen_t to be 8 bytes
701 // but the library expects a 32-bit value
702 int size = sizeof(hashdecl timeval);
703#else
704 socklen_t size = sizeof(hashdecl timeval);
705#endif
706
707 if (getsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (GETSOCKOPT_ARG_4)&tv, (socklen_t *)&size))
708 return -1;
709
710 return tv.tv_sec * 1000 + tv.tv_usec / 1000;
711 }
712
713 DLLLOCAL int getPort() {
714 // if we don't need to find out what port we are, then return current value
715 if (sock == QORE_INVALID_SOCKET || (sfamily != AF_INET && sfamily != AF_INET6) || port > 0)
716 return port;
717
718 // otherwise find out what port we're connected to
719 hashdecl sockaddr_storage addr;
720#if defined(HPUX) && defined(__ia64) && defined(__LP64__)
721 // on HPUX 64-bit the OS defines socklen_t to be 8 bytes, but the library expects a 32-bit value
722 int size = sizeof addr;
723#else
724 socklen_t size = sizeof addr;
725#endif
726
727 if (getsockname(sock, (struct sockaddr *)&addr, (socklen_t *)&size) < 0)
728 return -1;
729
730 port = q_get_port_from_addr((const struct sockaddr *)&addr);
731 return port;
732 }
733
734 DLLLOCAL static void do_header(const char* key, QoreString& hdr, const QoreValue& v) {
735 switch (v.getType()) {
736 case NT_STRING:
737 hdr.sprintf("%s: %s\r\n", key, v.get<const QoreStringNode>()->c_str());
738 break;
739 case NT_INT:
740 hdr.sprintf("%s: " QLLD "\r\n", key, v.getAsBigInt());
741 break;
742 case NT_FLOAT: {
743 hdr.sprintf("%s: ", key);
744 size_t offset = hdr.size();
745 hdr.sprintf("%f\r\n", v.getAsFloat());
746 // issue 1556: external modules that call setlocale() can change
747 // the decimal point character used here from '.' to ','
748 // only search the double added, QoreString::sprintf() concatenates
749 q_fix_decimal(&hdr, offset);
750 break;
751 }
752 case NT_NUMBER:
753 hdr.sprintf("%s: ", key);
754 v.get<const QoreNumberNode>()->toString(hdr);
755 hdr.concat("\r\n");
756 break;
757 case NT_BOOLEAN:
758 hdr.sprintf("%s: %d\r\n", key, (int)v.getAsBool());
759 break;
760 }
761 }
762
763 // issue #3879: must add Content-Length if not present, even if there is no message body
766 DLLLOCAL static void do_headers(QoreString& hdr, const QoreHashNode* headers, size_t size, bool addsize = true) {
767 // RFC-2616 4.4 (http://tools.ietf.org/html/rfc2616#section-4.4)
768 // add Content-Length: 0 to headers for responses without a body where there is no transfer-encoding
769 if (headers) {
770 ConstHashIterator hi(headers);
771
772 while (hi.next()) {
773 const QoreValue v = hi.get();
774 const char* key = hi.getKey();
775 if (!size && addsize) {
776 if (!strcasecmp(key, "transfer-encoding")) {
777 addsize = false;
778 } else if (!strcasecmp(key, "content-type")
779 && (v.getType() == NT_STRING)
780 && (*v.get<const QoreStringNode>() == "text/event-stream")) {
781 addsize = false;
782 }
783 }
784 if ((addsize || size) && !strcasecmp(key, "content-length")) {
785 // ignore Content-Length given manually
786 continue;
787 }
788 if (v.getType() == NT_LIST) {
789 ConstListIterator li(v.get<const QoreListNode>());
790 while (li.next())
791 do_header(key, hdr, li.getValue());
792 } else
793 do_header(key, hdr, v);
794 }
795 }
796 // add data and content-length header if necessary
797 if (size || addsize) {
798 hdr.sprintf("Content-Length: %zu\r\n", size);
799 //printd(5, "qore_socket_private::do_headers() added Content-Length: %zu\n", size);
800 }
801
802 hdr.concat("\r\n");
803 }
804
805 DLLLOCAL int listen(int backlog = 20) {
806 if (sock == QORE_INVALID_SOCKET)
807 return QSE_NOT_OPEN;
808 if (in_op >= 0)
809 return QSE_IN_OP;
810#ifdef _Q_WINDOWS
811 if (::listen(sock, backlog)) {
812 // set errno
813 sock_get_error();
814 return -1;
815 }
816 return 0;
817#else
818 return ::listen(sock, backlog);
819#endif
820 }
821
822 DLLLOCAL int accept_intern(ExceptionSink* xsink, struct sockaddr *addr, socklen_t *size, int timeout_ms = -1) {
823 //printd(5, "qore_socket_private::accept_intern() to: %d\n", timeout_ms);
824 assert(xsink);
825 while (true) {
826 if (timeout_ms >= 0 && !isDataAvailable(timeout_ms, "accept", xsink)) {
827 if (*xsink)
828 return -1;
829 // do not throw exception here, NOTHING will be returned in Qore on timeout
830 return QSE_TIMEOUT; // -3
831 }
832
833 int rc = ::accept(sock, addr, size);
834 if (rc != QORE_INVALID_SOCKET)
835 return rc;
836
837 // retry if interrupted by a signal
838 if (sock_get_error() == EINTR)
839 continue;
840
841 qore_socket_error(xsink, "SOCKET-ACCEPT-ERROR", "error in accept()", 0, 0, 0, addr);
842 return -1;
843 }
844 }
845
846 // returns a new socket
847 DLLLOCAL int accept_internal(ExceptionSink* xsink, SocketSource* source, int timeout_ms = -1) {
848 assert(xsink);
849 if (sock == QORE_INVALID_SOCKET) {
850 xsink->raiseException("SOCKET-NOT-OPEN", "socket must be opened, bound, and in a listening state before "
851 "new connections can be accepted");
852 return QSE_NOT_OPEN;
853 }
854 if (in_op >= 0) {
855 if (in_op == q_gettid()) {
856 se_in_op("Socket", "accept", xsink);
857 return QSE_IN_OP;
858 }
859 se_in_op_thread("Socket", "accept", xsink);
860 return QSE_IN_OP_THREAD;
861 }
862
863 int rc;
864 if (sfamily == AF_UNIX) {
865#ifdef _Q_WINDOWS
866 xsink->raiseException("SOCKET-ACCEPT-ERROR", "UNIX sockets are not available under Windows");
867 return -1;
868#else
869 hashdecl sockaddr_un addr_un;
870
871#if defined(HPUX) && defined(__ia64) && defined(__LP64__)
872 // on HPUX 64-bit the OS defines socklen_t to be 8 bytes
873 // but the library expects a 32-bit value
874 int size = sizeof(hashdecl sockaddr_un);
875#else
876 socklen_t size = sizeof(hashdecl sockaddr_un);
877#endif
878 rc = accept_intern(xsink, (struct sockaddr *)&addr_un, (socklen_t *)&size, timeout_ms);
879 //printd(1, "qore_socket_private::accept_internal() " QSD " bytes returned\n", size);
880
881 if (rc >= 0 && source) {
882 QoreStringNode* addr = new QoreStringNode(enc);
883 addr->sprintf("UNIX socket: %s", socketname.c_str());
884 source->priv->setAddress(addr);
885 source->priv->setHostName("localhost");
886 }
887#endif // windows
888 } else if (sfamily == AF_INET || sfamily == AF_INET6) {
889 hashdecl sockaddr_storage addr_in;
890#if defined(HPUX) && defined(__ia64) && defined(__LP64__)
891 // on HPUX 64-bit the OS defines socklen_t to be 8 bytes
892 // but the library expects a 32-bit value
893 int size = sizeof(addr_in);
894#else
895 socklen_t size = sizeof(addr_in);
896#endif
897
898 rc = accept_intern(xsink, (struct sockaddr *)&addr_in, (socklen_t *)&size, timeout_ms);
899 //printd(1, "qore_socket_private::accept_internal() rc: %d, %d bytes returned\n", rc, size);
900
901 if (rc >= 0 && source) {
902 char host[NI_MAXHOST + 1];
903 char service[NI_MAXSERV + 1];
904
905 if (!getnameinfo((struct sockaddr *)&addr_in, qore_get_in_len((struct sockaddr *)&addr_in), host,
906 sizeof(host), service, sizeof(service), NI_NUMERICSERV)) {
907 source->priv->setHostName(host);
908 }
909
910 // get ipv4 or ipv6 address
911 char ifname[INET6_ADDRSTRLEN];
912 if (inet_ntop(addr_in.ss_family, qore_get_in_addr((struct sockaddr *)&addr_in), ifname,
913 sizeof(ifname))) {
914 //printd(5, "inet_ntop() '%s' host: '%s'\n", ifname, host);
915 source->priv->setAddress(ifname);
916 }
917 }
918 } else {
919 // should not happen
920 xsink->raiseException("SOCKET-ACCEPT-ERROR", "do not know how to accept connections with address "
921 "family %d", sfamily);
922 rc = -1;
923 }
924 return rc;
925 }
926
927 DLLLOCAL QoreHashNode* getEvent(int event, int source = QORE_SOURCE_SOCKET) const {
928 QoreHashNode* h = new QoreHashNode(autoTypeInfo);
929 if (event_arg) {
930 h->setKeyValue("arg", event_arg.refSelf(), nullptr);
931 }
932
933 h->setKeyValue("event", event, nullptr);
934 h->setKeyValue("source", source, nullptr);
935 h->setKeyValue("id", (int64)this, nullptr);
936
937 return h;
938 }
939
940 DLLLOCAL void cleanup(ExceptionSink* xsink) {
941 if (event_queue) {
942 // close the socket before the delete message is put on the queue
943 // the socket would be closed anyway in the destructor
944 close_internal();
945
946 event_queue->pushAndTakeRef(getEvent(QORE_EVENT_DELETED));
947
948 // deref and remove event queue
949 event_queue->deref(xsink);
950 event_queue = nullptr;
951 }
952 if (warn_queue) {
953 warn_queue->deref(xsink);
954 warn_queue = nullptr;
955 if (warn_callback_arg) {
956 warn_callback_arg.discard(xsink);
957 warn_callback_arg.clear();
958 }
959 }
960 }
961
962 DLLLOCAL void setEventQueue(ExceptionSink* xsink, Queue* q, QoreValue arg, bool with_data) {
963 if (event_queue) {
964 if (event_arg) {
965 event_arg.discard(xsink);
966 }
967 event_queue->deref(xsink);
968 }
969 event_queue = q;
970 event_arg = arg;
971 event_data = with_data;
972 }
973
974 DLLLOCAL void do_start_ssl_event() {
975 if (event_queue) {
976 event_queue->pushAndTakeRef(getEvent(QORE_EVENT_START_SSL));
977 }
978 }
979
980 DLLLOCAL void do_ssl_established_event() {
981 if (event_queue) {
982 QoreHashNode* h = getEvent(QORE_EVENT_SSL_ESTABLISHED);
983 h->setKeyValue("cipher", new QoreStringNode(ssl->getCipherName()), nullptr);
984 h->setKeyValue("cipher_version", new QoreStringNode(ssl->getCipherVersion()), nullptr);
985 event_queue->pushAndTakeRef(h);
986 }
987 }
988
989 DLLLOCAL void do_connect_event(int af, const struct sockaddr* addr, const char* target,
990 const char* service = nullptr, int prt = -1) {
991 if (event_queue) {
992 QoreHashNode* h = getEvent(QORE_EVENT_CONNECTING);
993 QoreStringNode* str = q_addr_to_string2(addr);
994 if (str) {
995 h->setKeyValue("address", str, nullptr);
996 } else {
997 h->setKeyValue("error", q_strerror(sock_get_error()), nullptr);
998 }
999 q_af_to_hash(af, *h, nullptr);
1000 h->setKeyValue("target", new QoreStringNode(target), nullptr);
1001 if (service)
1002 h->setKeyValue("service", new QoreStringNode(service), nullptr);
1003 if (prt != -1)
1004 h->setKeyValue("port", prt, nullptr);
1005 event_queue->pushAndTakeRef(h);
1006 }
1007 }
1008
1009 DLLLOCAL void do_connected_event() {
1010 if (event_queue) {
1011 event_queue->pushAndTakeRef(getEvent(QORE_EVENT_CONNECTED));
1012 }
1013 }
1014
1015 DLLLOCAL void do_data_event_intern(int event, int source, const QoreStringNode& str) const {
1016 assert(event_queue && event_data && str.size());
1017 ReferenceHolder<QoreHashNode> h(getEvent(event, source), nullptr);
1018 h->setKeyValue("data", str.refSelf(), nullptr);
1019 event_queue->pushAndTakeRef(h.release());
1020 }
1021
1022 DLLLOCAL void do_data_event(int event, int source, const QoreStringNode& str) const {
1023 if (event_queue && event_data && str.size()) {
1024 do_data_event_intern(event, source, str);
1025 }
1026 }
1027
1028 DLLLOCAL void do_data_event(int event, int source, const BinaryNode& b) const {
1029 if (event_queue && event_data && b.size()) {
1030 ReferenceHolder<QoreHashNode> h(getEvent(event, source), nullptr);
1031 h->setKeyValue("data", b.refSelf(), nullptr);
1032 event_queue->pushAndTakeRef(h.release());
1033 }
1034 }
1035
1036 DLLLOCAL void do_data_event(int event, int source, const void* data, size_t size) const {
1037 if (event_queue && event_data && size) {
1038 ReferenceHolder<QoreHashNode> h(getEvent(event, source), nullptr);
1040 b->append(data, size);
1041 h->setKeyValue("data", b.release(), nullptr);
1042 event_queue->pushAndTakeRef(h.release());
1043 }
1044 }
1045
1046 DLLLOCAL void do_header_event(int event, int source, const QoreHashNode& hdr) const {
1047 if (event_queue && event_data && !hdr.empty()) {
1048 ReferenceHolder<QoreHashNode> h(getEvent(event, source), nullptr);
1049 h->setKeyValue("headers", hdr.refSelf(), nullptr);
1050 event_queue->pushAndTakeRef(h.release());
1051 }
1052 }
1053
1054 DLLLOCAL void do_chunked_read(int event, size_t bytes, size_t total_read, int source) {
1055 if (event_queue) {
1056 QoreHashNode* h = getEvent(event, source);
1057 if (event == QORE_EVENT_HTTP_CHUNKED_DATA_RECEIVED)
1058 h->setKeyValue("read", bytes, nullptr);
1059 else
1060 h->setKeyValue("size", bytes, nullptr);
1061 h->setKeyValue("total_read", total_read, nullptr);
1062 event_queue->pushAndTakeRef(h);
1063 }
1064 }
1065
1066 DLLLOCAL void do_read_http_header(int event, const QoreHashNode* headers, int source) {
1067 if (event_queue) {
1068 QoreHashNode* h = getEvent(event, source);
1069 h->setKeyValue("headers", headers->hashRefSelf(), nullptr);
1070 event_queue->pushAndTakeRef(h);
1071 }
1072 }
1073
1074 DLLLOCAL void do_send_http_message_event(const QoreString& str, const QoreHashNode* headers, int source) {
1075 if (event_queue) {
1076 QoreHashNode* h = getEvent(QORE_EVENT_HTTP_SEND_MESSAGE, source);
1077 h->setKeyValue("message", new QoreStringNode(str), nullptr);
1078 //printd(5, "do_send_http_message_event() str='%s' headers: %p (%d %s)\n", str.c_str(), headers, headers->getType(), headers->getTypeName());
1079 h->setKeyValue("headers", headers->copy(), nullptr);
1080 event_queue->pushAndTakeRef(h);
1081 }
1082 }
1083
1084 DLLLOCAL void do_close_event() {
1085 if (event_queue) {
1086 event_queue->pushAndTakeRef(getEvent(QORE_EVENT_CHANNEL_CLOSED));
1087 }
1088 }
1089
1090 DLLLOCAL void do_read_event(size_t bytes_read, size_t total_read, size_t bufsize = 0, int source = QORE_SOURCE_SOCKET) {
1091 // post bytes read on event queue, if any
1092 if (event_queue) {
1093 QoreHashNode* h = getEvent(QORE_EVENT_PACKET_READ, source);
1094 h->setKeyValue("read", bytes_read, nullptr);
1095 h->setKeyValue("total_read", total_read, nullptr);
1096 // set total bytes to read and remaining bytes if bufsize > 0
1097 if (bufsize > 0)
1098 h->setKeyValue("total_to_read", bufsize, nullptr);
1099 event_queue->pushAndTakeRef(h);
1100 }
1101 }
1102
1103 DLLLOCAL void do_send_event(int bytes_sent, int total_sent, int bufsize) {
1104 // post bytes sent on event queue, if any
1105 if (event_queue) {
1106 QoreHashNode* h = getEvent(QORE_EVENT_PACKET_SENT);
1107 h->setKeyValue("sent", bytes_sent, nullptr);
1108 h->setKeyValue("total_sent", total_sent, nullptr);
1109 h->setKeyValue("total_to_send", bufsize, nullptr);
1110 event_queue->pushAndTakeRef(h);
1111 }
1112 }
1113
1114 DLLLOCAL void do_resolve_event(const char* host, const char* service = 0) {
1115 // post bytes sent on event queue, if any
1116 if (event_queue) {
1117 QoreHashNode* h = getEvent(QORE_EVENT_HOSTNAME_LOOKUP);
1118 if (host)
1119 h->setKeyValue("name", new QoreStringNode(host), nullptr);
1120 if (service)
1121 h->setKeyValue("service", new QoreStringNode(service), nullptr);
1122 event_queue->pushAndTakeRef(h);
1123 }
1124 }
1125
1126 DLLLOCAL void do_resolved_event(const struct sockaddr* addr) {
1127 // post bytes sent on event queue, if any
1128 if (event_queue) {
1129 QoreHashNode* h = getEvent(QORE_EVENT_HOSTNAME_RESOLVED);
1130 QoreStringNode* str = q_addr_to_string2(addr);
1131 if (str)
1132 h->setKeyValue("address", str, nullptr);
1133 else
1134 h->setKeyValue("error", q_strerror(sock_get_error()), nullptr);
1135 int prt = q_get_port_from_addr(addr);
1136 if (prt > 0)
1137 h->setKeyValue("port", prt, nullptr);
1138 q_af_to_hash(addr->sa_family, *h, nullptr);
1139 event_queue->pushAndTakeRef(h);
1140 }
1141 }
1142
1143 DLLLOCAL int64 getObjectIDForEvents() const {
1144 return (int64)this;
1145 }
1146
1147 DLLLOCAL int connectUNIX(const char* p, int sock_type, int protocol, ExceptionSink* xsink) {
1148 assert(xsink);
1149 assert(p);
1150 QORE_TRACE("connectUNIX()");
1151
1152#ifdef _Q_WINDOWS
1153 xsink->raiseException("SOCKET-CONNECTUNIX-ERROR", "UNIX sockets are not available under Windows");
1154 return -1;
1155#else
1156 // close socket if already open
1157 close();
1158
1159 printd(5, "qore_socket_private::connectUNIX(%s)\n", p);
1160
1161 hashdecl sockaddr_un addr;
1162
1163 addr.sun_family = AF_UNIX;
1164 // copy path and terminate if necessary
1165 strncpy(addr.sun_path, p, sizeof(addr.sun_path) - 1);
1166 addr.sun_path[sizeof(addr.sun_path) - 1] = '\0';
1167 if ((sock = socket(AF_UNIX, sock_type, protocol)) == QORE_SOCKET_ERROR) {
1168 xsink->raiseErrnoException("SOCKET-CONNECT-ERROR", errno, "error connecting to UNIX socket: '%s'", p);
1169 return -1;
1170 }
1171
1172 do_connect_event(AF_UNIX, (sockaddr*)&addr, p);
1173 while (true) {
1174 if (!::connect(sock, (const sockaddr *)&addr, sizeof(struct sockaddr_un)))
1175 break;
1176
1177 // try again if we were interrupted by a signal
1178 if (sock_get_error() == EINTR)
1179 continue;
1180
1181 // otherwise close the socket and return an exception with the error code
1182 // do not have to worry about windows API calls here; this is a UNIX-only function
1183 close_and_reset();
1184 qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in connect()", 0, p);
1185
1186 return -1;
1187 }
1188
1189 // save file name for deleting when socket is closed
1190 socketname = addr.sun_path;
1191 sfamily = AF_UNIX;
1192
1193 do_connected_event();
1194
1195 return 0;
1196#endif // windows
1197 }
1198
1199 // socket must be open or -1 is returned and a Qore-language exception is raised
1200 /* return values:
1201 -1: error
1202 0: timeout
1203 > 0: I/O can continue
1204 */
1205 DLLLOCAL int asyncIoWait(int timeout_ms, bool read, bool write, const char* cname, const char* mname,
1206 ExceptionSink* xsink) const {
1207 assert(xsink);
1208 assert(read || write);
1209 if (sock == QORE_INVALID_SOCKET) {
1210 se_not_open(cname, mname, xsink, "asyncIoWait");
1211 return -1;
1212 }
1213
1214 return asyncIoWait(timeout_ms, read, write, xsink);
1215 }
1216
1217 DLLLOCAL int asyncIoWait(int timeout_ms, bool read, bool write, ExceptionSink* xsink) const {
1218 assert(xsink);
1219#if defined HAVE_POLL
1220 return poll_intern(xsink, timeout_ms, read, write);
1221#elif defined HAVE_SELECT
1222 return select_intern(xsink, timeout_ms, read, write);
1223#else
1224#error no async socket operations supported
1225#endif
1226 }
1227
1228#if defined HAVE_POLL
1229 DLLLOCAL int poll_intern(ExceptionSink* xsink, int timeout_ms, bool read, bool write) const {
1230 int rc;
1231 short arg = 0;
1232 if (read)
1233 arg |= POLLIN;
1234 if (write)
1235 arg |= POLLOUT;
1236 pollfd fds = {sock, arg, 0};
1237 while (true) {
1238 rc = ::poll(&fds, 1, timeout_ms);
1239 if (rc == -1 && errno == EINTR)
1240 continue;
1241 break;
1242 }
1243 if (rc < 0)
1244 qore_socket_error(xsink, "SOCKET-SELECT-ERROR", "poll(2) returned an error");
1245 else if (!rc && ((fds.revents & POLLHUP) || (fds.revents & (POLLERR|POLLNVAL))))
1246 rc = -1;
1247
1248 return rc;
1249 }
1250#elif defined HAVE_SELECT
1251 DLLLOCAL int select_intern(ExceptionSink* xsink, int timeout_ms, bool read, bool write) const {
1252 bool aborted = false;
1253 int rc = select_intern(xsink, timeout_ms, read, write, aborted);
1254 if (rc != QORE_SOCKET_ERROR && aborted)
1255 rc = -1;
1256 return rc;
1257 }
1258
1259 DLLLOCAL int select_intern(ExceptionSink* xsink, int timeout_ms, bool read, bool write, bool& aborted) const {
1260 assert(xsink);
1261 assert(!aborted);
1262 // windows does not use FD_SETSIZE to limit the value of the highest socket descriptor in the set
1263 // instead it has a maximum of 64 sockets in the set; we only need one anyway
1264#ifndef _Q_WINDOWS
1265 // select is inherently broken since it can only handle descriptors < FD_SETSIZE, which is 1024 on Linux for example
1266 if (sock >= FD_SETSIZE) {
1267 xsink->raiseException("SOCKET-SELECT-ERROR", "fd is %d which is >= %d; contact the Qore developers to implement an alternative to select() on this platform", sock, FD_SETSIZE);
1268 return -1;
1269 }
1270#endif
1271 hashdecl timeval tv;
1272 int rc;
1273 while (true) {
1274 // to be safe, we set the file descriptor arg after each EINTR (required on Linux for example)
1275 fd_set sfs, err;
1276
1277 FD_ZERO(&sfs);
1278 FD_ZERO(&err);
1279 FD_SET(sock, &sfs);
1280 FD_SET(sock, &err);
1281
1282 tv.tv_sec = timeout_ms / 1000;
1283 tv.tv_usec = (timeout_ms % 1000) * 1000;
1284
1285 fd_set* readfd = read ? &sfs : 0;
1286 fd_set* writefd = write ? &sfs : 0;
1287
1288 rc = select(sock + 1, readfd, writefd, &err, &tv);
1289 //printd(5, "select_intern() rc: %d err: %d\n", rc, FD_ISSET(sock, &err));
1290 if (rc != QORE_SOCKET_ERROR) {
1291 if (FD_ISSET(sock, &err))
1292 aborted = true;
1293 break;
1294 }
1295 if (sock_get_error() != EINTR)
1296 break;
1297 }
1298 if (rc == QORE_SOCKET_ERROR) {
1299 // do not close the socket here, even in case of EBADF, just return an error
1300 rc = 0;
1301 qore_socket_error(xsink, "SOCKET-SELECT-ERROR", "select(2) returned an error");
1302 }
1303
1304 return rc;
1305 }
1306#endif
1307
1308 DLLLOCAL bool tryReadSocketData(const char* mname, ExceptionSink* xsink) {
1309 assert(xsink);
1310 assert(!buflen);
1311 if (!ssl) {
1312 // issue #3564: see if any data is available on the socket
1313 return asyncIoWait(0, true, false, "Socket", mname, xsink);
1314 }
1315 // select can return true if there is protocol negotiation data available,
1316 // so we try to peek 1 byte of application data with a timeout of 0 with the SSL connection
1317 int rc = ssl->doSSLRW(xsink, mname, rbuf, 1, 0, PEEK, false);
1318 if (*xsink || (rc == QSE_TIMEOUT)) {
1319 return false;
1320 }
1321 return rc > 0 ? true : false;
1322 }
1323
1324 DLLLOCAL bool isSocketDataAvailable(int timeout_ms, const char* mname, ExceptionSink* xsink) {
1325 return asyncIoWait(timeout_ms, true, false, "Socket", mname, xsink);
1326 }
1327
1328 DLLLOCAL bool isDataAvailable(int timeout_ms, const char* mname, ExceptionSink* xsink) {
1329 if (buflen)
1330 return true;
1331 return isSocketDataAvailable(timeout_ms, mname, xsink);
1332 }
1333
1334 DLLLOCAL bool isWriteFinished(int timeout_ms, const char* mname, ExceptionSink* xsink) {
1335 return asyncIoWait(timeout_ms, false, true, "Socket", mname, xsink);
1336 }
1337
1338 DLLLOCAL int close_and_exit() {
1339 if (sock != QORE_INVALID_SOCKET)
1340 close_and_reset();
1341 return -1;
1342 }
1343
1344 DLLLOCAL int connectINETTimeout(int timeout_ms, const struct sockaddr* ai_addr, size_t ai_addrlen,
1345 ExceptionSink* xsink, bool only_timeout) {
1346 assert(xsink);
1347 PrivateQoreSocketTimeoutHelper toh(this, "connect");
1348
1349 while (true) {
1350 if (!::connect(sock, ai_addr, ai_addrlen))
1351 return 0;
1352
1353#ifdef _Q_WINDOWS
1354 if (sock_get_error() != EAGAIN) {
1355 qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in connect()", 0, 0, 0, ai_addr);
1356 break;
1357 }
1358#else
1359 // try again if we were interrupted by a signal
1360 if (errno == EINTR)
1361 continue;
1362
1363 if (errno != EINPROGRESS)
1364 break;
1365#endif
1366
1367 //printd(5, "qore_socket_private::connectINETTimeout() timeout_ms: %d errno: %d\n", timeout_ms, errno);
1368
1369 // check for timeout or connection with EINPROGRESS
1370 while (true) {
1371#ifdef _Q_WINDOWS
1372 bool aborted = false;
1373 int rc = select_intern(xsink, timeout_ms, false, true, aborted);
1374
1375 //printd(5, "qore_socket_private::connectINETTimeout() timeout_ms: %d rc: %d aborted: %d\n",
1376 // timeout_ms, rc, aborted);
1377
1378 // windows select() returns an error in the error socket set instead of an WSAECONNREFUSED error like
1379 // UNIX, so we simulate it here
1380 if (rc != QORE_SOCKET_ERROR && aborted) {
1381 qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in connect()", 0, 0, 0, ai_addr);
1382 return -1;
1383 }
1384#else
1385 int rc = asyncIoWait(timeout_ms, false, true, "Socket", "connectINETTimeout", xsink);
1386#endif
1387 if (*xsink)
1388 return -1;
1389
1390 //printd(5, "asyncIoWait(%d) returned %d\n", timeout_ms, rc);
1391 if (rc == QORE_SOCKET_ERROR && sock_get_error() != EINTR) {
1392 if (!only_timeout)
1393 qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in asyncIoWait() with "
1394 "Socket::connect() with timeout", 0, 0, 0, ai_addr);
1395 return -1;
1396 } else if (rc > 0) {
1397 return checkConnected(xsink, nullptr, ai_addr, only_timeout);
1398 } else {
1399 SimpleRefHolder<QoreStringNode> desc(new QoreStringNodeMaker("timeout in connection after %dms",
1400 timeout_ms));
1401 concat_target(*(*desc), ai_addr);
1402 xsink->raiseException("SOCKET-CONNECT-ERROR", desc.release());
1403 return -1;
1404 }
1405 }
1406 }
1407
1408 return -1;
1409 }
1410
1411 DLLLOCAL int checkConnected(ExceptionSink* xsink, const char* hostsvc, const struct sockaddr* ai_addr = nullptr,
1412 bool only_timeout = false) {
1413 assert(sock);
1414
1415 // socket selected for write
1416 socklen_t lon = sizeof(int);
1417 int val;
1418
1419 if (getsockopt(sock, SOL_SOCKET, SO_ERROR, (GETSOCKOPT_ARG_4)(&val), &lon) == QORE_SOCKET_ERROR) {
1420 if (!only_timeout) {
1421 qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in getsockopt()", nullptr, hostsvc, nullptr,
1422 ai_addr);
1423 }
1424 return -1;
1425 }
1426
1427 if (val) {
1428 if (only_timeout) {
1429 errno = val;
1430 return -1;
1431 }
1432 qore_socket_error_intern(val, xsink, "SOCKET-CONNECT-ERROR", "error in getsockopt()", nullptr, hostsvc,
1433 nullptr, ai_addr);
1434 return -1;
1435 }
1436
1437 // connected successfully within the timeout period
1438 return 0;
1439 }
1440
1441 DLLLOCAL void confirmConnected(const char* host) {
1442 do_connected_event();
1443
1444 // issue #3053: save hostname for SNI
1445 if (host) {
1446 client_target = host;
1447 }
1448 }
1449
1450 DLLLOCAL int sock_errno_err(const char* err, const char* desc, ExceptionSink* xsink) {
1451 //sock = QORE_INVALID_SOCKET;
1452 qore_socket_error(xsink, err, desc);
1453 return -1;
1454 }
1455
1456 DLLLOCAL int set_non_blocking(bool non_blocking, ExceptionSink* xsink) {
1457 assert(xsink);
1458 // ignore call when socket already closed
1459 if (sock == QORE_INVALID_SOCKET) {
1460 assert(*xsink);
1461 return -1;
1462 }
1463
1464#ifdef _Q_WINDOWS
1465 u_long mode = non_blocking ? 1 : 0;
1466 int rc = ioctlsocket(sock, FIONBIO, &mode);
1467 if (check_windows_rc(rc)) {
1468 return sock_errno_err("SOCKET-CONNECT-ERROR", "error in ioctlsocket(FIONBIO)", xsink);
1469 }
1470#else
1471 int arg;
1472
1473 // get socket descriptor status flags
1474 if ((arg = fcntl(sock, F_GETFL, 0)) < 0) {
1475 return sock_errno_err("SOCKET-CONNECT-ERROR", "error in fcntl() getting socket descriptor status "
1476 "flag", xsink);
1477 }
1478
1479 if (non_blocking) { // set non-blocking
1480 arg |= O_NONBLOCK;
1481 } else { // set blocking
1482 arg &= ~O_NONBLOCK;
1483 }
1484
1485 if (fcntl(sock, F_SETFL, arg) < 0) {
1486 return sock_errno_err("SOCKET-CONNECT-ERROR", "error in fcntl() setting socket descriptor status "
1487 "flag", xsink);
1488 }
1489#endif
1490 //printd(5, "qore_socket_private::set_non_blocking() set: %d\n", non_blocking);
1491
1492 return 0;
1493 }
1494
1495 DLLLOCAL int connectINET(const char* host, const char* service, int timeout_ms, ExceptionSink* xsink,
1496 int family = AF_UNSPEC, int type = SOCK_STREAM, int protocol = 0) {
1497 assert(xsink);
1498 family = q_get_af(family);
1499 type = q_get_sock_type(type);
1500
1501 QORE_TRACE("qore_socket_private::connectINET()");
1502
1503 // close socket if already open
1504 close();
1505
1506 printd(5, "qore_socket_private::connectINET(%s:%s, %dms)\n", host, service, timeout_ms);
1507
1508 do_resolve_event(host, service);
1509
1510 QoreAddrInfo ai;
1511 if (ai.getInfo(xsink, host, service, family, 0, type, protocol))
1512 return -1;
1513
1514 hashdecl addrinfo* aip = ai.getAddrInfo();
1515
1516 // emit all "resolved" events
1517 if (event_queue)
1518 for (struct addrinfo* p = aip; p; p = p->ai_next)
1519 do_resolved_event(p->ai_addr);
1520
1521 int prt = q_get_port_from_addr(aip->ai_addr);
1522
1523 for (struct addrinfo* p = aip; p; p = p->ai_next) {
1524 if (!connectINETIntern(host, service, p->ai_family, p->ai_addr, p->ai_addrlen, p->ai_socktype,
1525 p->ai_protocol, prt, timeout_ms, xsink, true)) {
1526 return 0;
1527 }
1528 if (*xsink) {
1529 break;
1530 }
1531 }
1532
1533 if (!*xsink) {
1534 qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in connect()", 0, host, service);
1535 }
1536 return -1;
1537 }
1538
1539 DLLLOCAL int connectINETIntern(const char* host, const char* service, int ai_family, struct sockaddr* ai_addr,
1540 size_t ai_addrlen, int ai_socktype, int ai_protocol, int prt, int timeout_ms, ExceptionSink* xsink,
1541 bool only_timeout = false) {
1542 assert(xsink);
1543 printd(5, "qore_socket_private::connectINETIntern() host: %s service: %s family: %d timeout_ms: %d\n", host,
1544 service, ai_family, timeout_ms);
1545 if ((sock = socket(ai_family, ai_socktype, ai_protocol)) == QORE_INVALID_SOCKET) {
1546 xsink->raiseErrnoException("SOCKET-CONNECT-ERROR", errno, "cannot establish a connection to %s:%s", host,
1547 service);
1548 return -1;
1549 }
1550
1551 //printd(5, "qore_socket_private::connectINETIntern(this: %p, host: '%s', port: %d, timeout_ms: %d) "
1552 // "sock: %d\n", this, host, port, timeout_ms, sock);
1553
1554 int rc;
1555
1556 // perform connect with timeout if a non-negative timeout was passed
1557 if (timeout_ms >= 0) {
1558 // set non-blocking
1559 if (set_non_blocking(true, xsink))
1560 return close_and_exit();
1561
1562 do_connect_event(ai_family, ai_addr, host, service, prt);
1563
1564 rc = connectINETTimeout(timeout_ms, ai_addr, ai_addrlen, xsink, only_timeout);
1565 //printd(5, "qore_socket_private::connectINETIntern() errno: %d rc: %d, xsink: %d\n", errno, rc, xsink && *xsink);
1566
1567 // set blocking
1568 if (set_non_blocking(false, xsink))
1569 return close_and_exit();
1570 } else {
1571 do_connect_event(ai_family, ai_addr, host, service, prt);
1572
1573 while (true) {
1574 rc = ::connect(sock, ai_addr, ai_addrlen);
1575
1576 // try again if rc == -1 and errno == EINTR
1577 if (!rc || sock_get_error() != EINTR)
1578 break;
1579 }
1580 }
1581
1582 if (rc < 0) {
1583 if (!only_timeout || errno == ETIMEDOUT)
1584 qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in connect()", 0, host, service);
1585
1586 return close_and_exit();
1587 }
1588
1589 sfamily = ai_family;
1590 stype = ai_socktype;
1591 sprot = ai_protocol;
1592 port = prt;
1593 //printd(5, "qore_socket_private::connectINETIntern(this: %p, host='%s', port: %d, timeout_ms: %d) success, "
1594 // "rc: %d, sock: %d\n", this, host, port, timeout_ms, rc, sock);
1595
1596 confirmConnected(host);
1597 return 0;
1598 }
1599
1600 DLLLOCAL int upgradeClientToSSLIntern(ExceptionSink* xsink, const char* mname, const char* sni_target_host,
1601 int timeout_ms, QoreSSLCertificate* cert = nullptr, QoreSSLPrivateKey* pkey = nullptr) {
1602 assert(!ssl);
1603 SSLSocketHelperHelper sshh(this, true);
1604
1605 int rc;
1606 do_start_ssl_event();
1607 // issue #3053: send target hostname to support SNI
1608 if (!sni_target_host && !client_target.empty()) {
1609 sni_target_host = client_target.c_str();
1610 }
1611 if ((rc = ssl->setClient(xsink, mname, sni_target_host, sock, cert, pkey))
1612 || ssl->connect(mname, timeout_ms,
1613 xsink)) {
1614 sshh.error();
1615 return rc ? rc : -1;
1616 }
1617 do_ssl_established_event();
1618
1619 return 0;
1620 }
1621
1622 DLLLOCAL int upgradeServerToSSLIntern(ExceptionSink* xsink, const char* mname, int timeout_ms,
1623 QoreSSLCertificate* cert = nullptr, QoreSSLPrivateKey* pkey = nullptr) {
1624 assert(!ssl);
1625 //printd(5, "qore_socket_private::upgradeServerToSSLIntern() this: %p mode: %d\n", this, ssl_verify_mode);
1626 SSLSocketHelperHelper sshh(this, true);
1627
1628 do_start_ssl_event();
1629 if (ssl->setServer(xsink, mname, sock, cert, pkey) || ssl->accept(mname, timeout_ms, xsink)) {
1630 sshh.error();
1631 return -1;
1632 }
1633 do_ssl_established_event();
1634
1635 return 0;
1636 }
1637
1638 // returns 0 = success, -1 = error
1639 DLLLOCAL int openUNIX(int sock_type = SOCK_STREAM, int protocol = 0) {
1640 if (sock != QORE_INVALID_SOCKET)
1641 close();
1642
1643 if ((sock = socket(AF_UNIX, sock_type, protocol)) == QORE_INVALID_SOCKET) {
1644 return -1;
1645 }
1646
1647 sfamily = AF_UNIX;
1648 stype = sock_type;
1649 sprot = protocol;
1650 port = -1;
1651 return 0;
1652 }
1653
1654 // returns 0 = success, -1 = error
1655 DLLLOCAL int openINET(int family = AF_INET, int sock_type = SOCK_STREAM, int protocol = 0) {
1656 if (sock != QORE_INVALID_SOCKET)
1657 close();
1658
1659 if ((sock = socket(family, sock_type, protocol)) == QORE_INVALID_SOCKET)
1660 return -1;
1661
1662 sfamily = family;
1663 stype = sock_type;
1664 sprot = protocol;
1665 port = -1;
1666 return 0;
1667 }
1668
1669 DLLLOCAL int reuse(int opt) {
1670 //printf("qore_socket_private::reuse(%s)\n", opt ? "true" : "false");
1671 return setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (SETSOCKOPT_ARG_4)&opt, sizeof(int));
1672 }
1673
1674 // the only place where xsink is optional
1675 DLLLOCAL int bindIntern(struct sockaddr* ai_addr, size_t ai_addrlen, int prt, bool reuseaddr, ExceptionSink* xsink = 0) {
1676 reuse(reuseaddr);
1677
1678 if ((::bind(sock, ai_addr, ai_addrlen)) == QORE_SOCKET_ERROR) {
1679 if (xsink)
1680 qore_socket_error(xsink, "SOCKET-BIND-ERROR", "error in bind()", 0, 0, 0, ai_addr);
1681 close();
1682 return -1;
1683 }
1684
1685 // set port number
1686 if (prt)
1687 port = prt;
1688 else {
1689 // get port number
1690#if defined(HPUX) && defined(__ia64) && defined(__LP64__)
1691 // on HPUX 64-bit the OS defines socklen_t to be 8 bytes, but the library expects a 32-bit value
1692 int len = ai_addrlen;
1693#else
1694 socklen_t len = ai_addrlen;
1695#endif
1696
1697 if (getsockname(sock, ai_addr, &len))
1698 port = -1;
1699 else
1700 port = q_get_port_from_addr(ai_addr);
1701 }
1702 return 0;
1703 }
1704
1705 // bind to UNIX domain socket file
1706 DLLLOCAL int bindUNIX(ExceptionSink* xsink, const char* name, int socktype = SOCK_STREAM, int protocol = 0) {
1707 assert(xsink);
1708#ifdef _Q_WINDOWS
1709 xsink->raiseException("SOCKET-BINDUNIX-ERROR", "UNIX sockets are not available under Windows");
1710 return -1;
1711#else
1712 close();
1713
1714 // try to open socket if necessary
1715 if (openUNIX(socktype, protocol)) {
1716 xsink->raiseErrnoException("SOCKET-BIND-ERROR", errno, "error opening UNIX socket ('%s') for bind", name);
1717 return -1;
1718 }
1719
1720 hashdecl sockaddr_un addr;
1721 addr.sun_family = AF_UNIX;
1722 // copy path and terminate if necessary
1723 strncpy(addr.sun_path, name, sizeof(addr.sun_path) - 1);
1724 addr.sun_path[sizeof(addr.sun_path) - 1] = '\0';
1725
1726 if (bindIntern((sockaddr*)&addr, sizeof(struct sockaddr_un), -1, false, xsink))
1727 return -1;
1728
1729 // save socket file name for deleting on close
1730 socketname = addr.sun_path;
1731 // delete UNIX domain socket on close
1732 del = true;
1733 return 0;
1734#endif // windows
1735 }
1736
1737 DLLLOCAL int bindINET(ExceptionSink* xsink, const char* name, const char* service, bool reuseaddr = true, int family = AF_UNSPEC, int socktype = SOCK_STREAM, int protocol = 0) {
1738 assert(xsink);
1739 family = q_get_af(family);
1740 socktype = q_get_sock_type(socktype);
1741
1742 close();
1743
1744 QoreAddrInfo ai;
1745 do_resolve_event(name, service);
1746 if (ai.getInfo(xsink, name, service, family, AI_PASSIVE, socktype, protocol))
1747 return -1;
1748
1749 hashdecl addrinfo* aip = ai.getAddrInfo();
1750 // first emit all "resolved" events
1751 if (event_queue)
1752 for (struct addrinfo* p = aip; p; p = p->ai_next)
1753 do_resolved_event(p->ai_addr);
1754
1755 // try to open socket if necessary
1756 if (openINET(aip->ai_family, aip->ai_socktype, protocol)) {
1757 qore_socket_error(xsink, "SOCKET-BINDINET-ERROR", "error opening socket for bind", 0, name, service);
1758 return -1;
1759 }
1760
1761 int prt = q_get_port_from_addr(aip->ai_addr);
1762
1763 int en = 0;
1764 // iterate through addresses and bind to the first interface possible
1765 for (struct addrinfo* p = aip; p; p = p->ai_next) {
1766 if (!bindIntern(p->ai_addr, p->ai_addrlen, prt, reuseaddr)) {
1767 //printd(5, "qore_socket_private::bindINET(family: %d) bound: name: %s service: %s f: %d st: %d p: %d\n", family, name ? name : "(null)", service ? service : "(null)", p->ai_family, p->ai_socktype, p->ai_protocol);
1768 return 0;
1769 }
1770
1771 en = sock_get_raw_error();
1772 //printd(5, "qore_socket_private::bindINET() failed to bind: name: %s service: %s f: %d st: %d p: %d, errno: %d (%s)\n", name ? name : "(null)", service ? service : "(null)", p->ai_family, p->ai_socktype, p->ai_protocol, en, strerror(en));
1773 }
1774
1775 // if no bind was possible, then raise an exception
1776 qore_socket_error_intern(en, xsink, "SOCKET-BIND-ERROR", "error binding on socket", 0, name, service);
1777 return -1;
1778 }
1779
1780 // only called from qore-bound code - always with xsink
1781 DLLLOCAL QoreHashNode* getPeerInfo(ExceptionSink* xsink, bool host_lookup = true) const {
1782 assert(xsink);
1783 if (sock == QORE_INVALID_SOCKET) {
1784 se_not_open("Socket", "getPeerInfo", xsink);
1785 return 0;
1786 }
1787
1788 hashdecl sockaddr_storage addr;
1789 socklen_t len = sizeof addr;
1790 if (getpeername(sock, (struct sockaddr*)&addr, &len)) {
1791 qore_socket_error(xsink, "SOCKET-GETPEERINFO-ERROR", "error in getpeername()");
1792 return 0;
1793 }
1794
1795 return getAddrInfo(addr, len, host_lookup);
1796 }
1797
1798 // only called from qore-bound code - always with xsink
1799 DLLLOCAL QoreHashNode* getSocketInfo(ExceptionSink* xsink, bool host_lookup = true) const {
1800 assert(xsink);
1801 if (sock == QORE_INVALID_SOCKET) {
1802 se_not_open("Socket", "getSocketInfo", xsink);
1803 return 0;
1804 }
1805
1806 hashdecl sockaddr_storage addr;
1807#if defined(HPUX) && defined(__ia64) && defined(__LP64__)
1808 // on HPUX 64-bit the OS defines socklen_t to be 8 bytes, but the library expects a 32-bit value
1809 int len = sizeof addr;
1810#else
1811 socklen_t len = sizeof addr;
1812#endif
1813
1814 if (getsockname(sock, (struct sockaddr*)&addr, &len)) {
1815 qore_socket_error(xsink, "SOCKET-GETSOCKETINFO-ERROR", "error in getsockname()");
1816 return 0;
1817 }
1818
1819 return getAddrInfo(addr, len, host_lookup);
1820 }
1821
1822 DLLLOCAL QoreHashNode* getAddrInfo(const struct sockaddr_storage& addr, socklen_t len, bool host_lookup = true) const {
1823 QoreHashNode* h = new QoreHashNode(autoTypeInfo);
1824
1825 if (addr.ss_family == AF_INET || addr.ss_family == AF_INET6) {
1826 if (host_lookup) {
1827 char host[NI_MAXHOST + 1];
1828
1829 if (!getnameinfo((struct sockaddr*)&addr, qore_get_in_len((struct sockaddr*)&addr), host, sizeof(host), 0, 0, 0)) {
1830 QoreStringNode* hoststr = new QoreStringNode(host);
1831 h->setKeyValue("hostname", hoststr, 0);
1832 h->setKeyValue("hostname_desc", QoreAddrInfo::getAddressDesc(addr.ss_family, hoststr->c_str()), 0);
1833 }
1834 }
1835
1836 // get ipv4 or ipv6 address
1837 char ifname[INET6_ADDRSTRLEN];
1838 if (inet_ntop(addr.ss_family, qore_get_in_addr((struct sockaddr*)&addr), ifname, sizeof(ifname))) {
1839 QoreStringNode* addrstr = new QoreStringNode(ifname);
1840 h->setKeyValue("address", addrstr, 0);
1841 h->setKeyValue("address_desc", QoreAddrInfo::getAddressDesc(addr.ss_family, addrstr->c_str()), 0);
1842 }
1843
1844 int tport;
1845 if (addr.ss_family == AF_INET) {
1846 hashdecl sockaddr_in* s = (hashdecl sockaddr_in*)&addr;
1847 tport = ntohs(s->sin_port);
1848 } else {
1849 hashdecl sockaddr_in6* s = (hashdecl sockaddr_in6*)&addr;
1850 tport = ntohs(s->sin6_port);
1851 }
1852
1853 h->setKeyValue("port", tport, 0);
1854 }
1855#ifndef _Q_WINDOWS
1856 else if (addr.ss_family == AF_UNIX) {
1857 assert(!socketname.empty());
1858 QoreStringNode* addrstr = new QoreStringNode(socketname);
1859 h->setKeyValue("address", addrstr, 0);
1860 h->setKeyValue("address_desc", QoreAddrInfo::getAddressDesc(addr.ss_family, addrstr->c_str()), 0);
1861 }
1862#endif
1863
1864 h->setKeyValue("family", addr.ss_family, 0);
1865 h->setKeyValue("familystr", new QoreStringNode(QoreAddrInfo::getFamilyName(addr.ss_family)), 0);
1866
1867 return h;
1868 }
1869
1870 // set backwards-compatible object members on accept
1871 // to be (hopefully) deleted in a future version of qore
1872 DLLLOCAL void setAccept(QoreObject* o) {
1873 hashdecl sockaddr_storage addr;
1874
1875 socklen_t len = sizeof addr;
1876 if (getpeername(sock, (struct sockaddr*)&addr, &len))
1877 return;
1878
1879 if (addr.ss_family == AF_INET || addr.ss_family == AF_INET6) {
1880 // get ipv4 or ipv6 address
1881 char ifname[INET6_ADDRSTRLEN];
1882 if (inet_ntop(addr.ss_family, qore_get_in_addr((struct sockaddr *)&addr), ifname, sizeof(ifname))) {
1883 //printd(5, "inet_ntop() '%s' host: '%s'\n", ifname, host);
1884 o->setValue("source", new QoreStringNode(ifname), 0);
1885 }
1886
1887 char host[NI_MAXHOST + 1];
1888 if (!getnameinfo((struct sockaddr *)&addr, qore_get_in_len((struct sockaddr *)&addr), host, sizeof(host),
1889 0, 0, 0)) {
1890 o->setValue("source_host", new QoreStringNode(host), 0);
1891 }
1892 }
1893#ifndef _Q_WINDOWS
1894 else if (addr.ss_family == AF_UNIX) {
1895 QoreStringNode* astr = new QoreStringNode(enc);
1896 hashdecl sockaddr_un *addr_un = (hashdecl sockaddr_un *)&addr;
1897 astr->sprintf("UNIX socket: %s", addr_un->sun_path);
1898 o->setValue("source", astr, 0);
1899 o->setValue("source_host", new QoreStringNode("localhost"), 0);
1900 }
1901#endif
1902 }
1903
1905
1910 DLLLOCAL int readByteFromBuffer(char& output) {
1911 // must be checked if open/connected before this function is called
1912 assert(sock != QORE_INVALID_SOCKET);
1913
1914 // always returned buffered data first
1915 if (!buflen) {
1916 return -1;
1917 }
1918
1919 output = *(rbuf + bufoffset);
1920 if (buflen == 1) {
1921 buflen = 0;
1922 bufoffset = 0;
1923 } else {
1924 --buflen;
1925 ++bufoffset;
1926 }
1927 return 0;
1928 }
1929
1930 // buffered reads for high performance
1931 DLLLOCAL ssize_t brecv(ExceptionSink* xsink, const char* meth, char*& buf, size_t bs, int flags,
1932 int timeout, bool do_event = true, bool suppress_exception = false) {
1933 assert(xsink);
1934 // must be checked if open/connected before this function is called
1935 assert(sock != QORE_INVALID_SOCKET);
1936 assert(meth);
1937
1938 // always returned buffered data first
1939 if (buflen) {
1940 buf = rbuf + bufoffset;
1941 if (buflen <= bs) {
1942 bs = buflen;
1943 buflen = 0;
1944 bufoffset = 0;
1945 } else {
1946 buflen -= bs;
1947 bufoffset += bs;
1948 }
1949 return (ssize_t)bs;
1950 }
1951
1952 // real socket reads are only done when the buffer is empty
1953
1954 //printd(5, "qore_socket_private::brecv(buf: %p, bs: %d, flags: %d, timeout: %d, do_event: %d) this: %p "
1955 // ssl: %d\n", buf, (int)bs, flags, timeout, (int)do_event, this, ssl);
1956
1957 ssize_t rc;
1958 if (!ssl) {
1959 if (timeout >= 0 && !isDataAvailable(timeout, meth, xsink)) {
1960 if (*xsink) {
1961 return -1;
1962 }
1963 if (!suppress_exception) {
1964 se_timeout("Socket", meth, timeout, xsink);
1965 }
1966 return QSE_TIMEOUT;
1967 }
1968
1969 while (true) {
1970 errno = 0;
1971 rc = ::recv(sock, rbuf, DEFAULT_SOCKET_BUFSIZE, flags);
1972 if (rc == QORE_SOCKET_ERROR) {
1973 sock_get_error();
1974 if (errno == EINTR) {
1975 continue;
1976 }
1977#ifdef ECONNRESET
1978 if (errno == ECONNRESET) {
1979 if (!suppress_exception) {
1980 se_closed("Socket", meth, xsink);
1981 }
1982 close();
1983 } else
1984#endif
1985 if (!suppress_exception) {
1986 qore_socket_error(xsink, "SOCKET-RECV-ERROR", "error in recv()", meth);
1987 }
1988 break;
1989 }
1990 //printd(5, "qore_socket_private::brecv(%d, %p, %ld, %d) rc: %ld errno: %d\n", sock, buf, bs, flags,
1991 // rc, errno);
1992 // try again if we were interrupted by a signal
1993 if (rc >= 0) {
1994 break;
1995 }
1996 }
1997 } else {
1998 rc = ssl->read(xsink, meth, rbuf, DEFAULT_SOCKET_BUFSIZE, timeout, suppress_exception);
1999 }
2000
2001 //printd(5, "qore_socket_private::brecv(%d, %p, %ld, %d) rc: %ld errno: %d\n", sock, buf, bs, flags, rc, errno);
2002 if (rc > 0) {
2003 buf = rbuf;
2004 assert(!buflen);
2005 assert(!bufoffset);
2006 if (rc > (ssize_t)bs) {
2007 buflen = rc - bs;
2008 bufoffset = bs;
2009 rc = bs;
2010 }
2011
2012 // register event
2013 if (do_event) {
2014 do_read_event(rc, rc);
2015 }
2016 } else {
2017 // only close the socket if no data was recevied and the error is not EAGAIN or EINPROGRESS
2018 if (!rc && isOpen() && errno != EAGAIN && errno != EINPROGRESS) {
2019 close();
2020 }
2021 }
2022
2023 return rc;
2024 }
2025
2027 DLLLOCAL QoreStringNode* readHTTPData(ExceptionSink* xsink, const char* meth, int timeout, ssize_t& rc,
2028 bool exit_early = false) {
2029 assert(xsink);
2030 assert(meth);
2031 if (sock == QORE_INVALID_SOCKET) {
2032 se_not_open("Socket", meth, xsink, "readHTTPData");
2033 rc = QSE_NOT_OPEN;
2034 return 0;
2035 }
2036
2037 PrivateQoreSocketThroughputHelper th(this, false);
2038
2039 // state:
2040 // 0 = '\r' received
2041 // 1 = '\r\n' received
2042 // 2 = '\r\n\r' received
2043 // 3 = '\n' received
2044 // read in HHTP header until \r\n\r\n or \n\n from socket
2045 int state = -1;
2047
2048 size_t count = 0;
2049
2050 while (true) {
2051 char* buf;
2052 rc = brecv(xsink, meth, buf, 1, 0, timeout, false);
2053 //printd(5, "qore_socket_private::readHTTPData() this: %p Socket::%s(): rc: %zd read char: %c (%03d) "
2054 // "(old state: %d)\n", this, meth, rc, rc > 0 && buf[0] > 31 ? buf[0] : '?', rc > 0 ? buf[0] : 0, state);
2055 if (rc <= 0) {
2056 //printd(5, "qore_socket_private::readHTTPData(timeout: %d) hdr='%s' (len: %d), rc=" QSD ", "
2057 // "errno: %d: '%s'\n", timeout, hdr->c_str(), hdr->strlen(), rc, errno, strerror(errno));
2058
2059 if (!*xsink) {
2060 if (!count) {
2061 //printd(5, "qore_socket_private::readHTTPData() this: %p rc: %d count: %d (%d) "
2062 // "timeout: %d\n", this, rc, count, hdr->size(), timeout);
2063 se_closed("Socket", meth, xsink);
2064 } else {
2065 xsink->raiseExceptionArg("SOCKET-HTTP-ERROR", hdr.release(), "socket closed on remote end "
2066 "while reading header data after reading " QSD " byte%s", count, count == 1 ? "" : "s");
2067 }
2068 }
2069 return 0;
2070 }
2071 char c = buf[0];
2072 if (++count == QORE_MAX_HEADER_SIZE) {
2073 xsink->raiseException("SOCKET-HTTP-ERROR", "header size cannot exceed " QSD " bytes", count);
2074 return 0;
2075 }
2076
2077 // check if we can progress to the next state
2078 if (c == '\n') {
2079 if (state == -1) {
2080 state = 3;
2081 continue;
2082 }
2083 if (!state) {
2084 if (exit_early && hdr->empty())
2085 return 0;
2086 state = 1;
2087 continue;
2088 }
2089 assert(state > 0);
2090 break;
2091 } else if (c == '\r') {
2092 if (state == -1) {
2093 state = 0;
2094 continue;
2095 }
2096 if (!state)
2097 break;
2098 if (state == 1) {
2099 state = 2;
2100 continue;
2101 }
2102 }
2103
2104 if (state != -1) {
2105 switch (state) {
2106 case 0: hdr->concat('\r'); break;
2107 case 1: hdr->concat("\r\n"); break;
2108 case 2: hdr->concat("\r\n\r"); break;
2109 case 3: hdr->concat('\n'); break;
2110 }
2111 state = -1;
2112 }
2113 hdr->concat(c);
2114 }
2115 hdr->concat('\n');
2116 //printd(5, "qore_socket_private::readHTTPData(timeout: %d) hdr='%s' (%d)\n", timeout, hdr->c_str(),
2117 // hdr->size());
2118 th.finalize(hdr->size());
2119 return hdr.release();
2120 }
2121
2122 DLLLOCAL QoreStringNode* recv(ExceptionSink* xsink, ssize_t bufsize, int timeout, ssize_t& rc,
2123 int source = QORE_SOURCE_SOCKET) {
2124 assert(xsink);
2125 if (sock == QORE_INVALID_SOCKET) {
2126 se_not_open("Socket", "recv", xsink, "recv");
2127 rc = QSE_NOT_OPEN;
2128 return 0;
2129 }
2130 if (in_op >= 0) {
2131 if (in_op == q_gettid()) {
2132 se_in_op("Socket", "recv", xsink);
2133 return 0;
2134 }
2135 se_in_op_thread("Socket", "recv", xsink);
2136 return 0;
2137 }
2138
2139 PrivateQoreSocketThroughputHelper th(this, false);
2140
2141 size_t bs = bufsize > 0 && bufsize < DEFAULT_SOCKET_BUFSIZE ? bufsize : DEFAULT_SOCKET_BUFSIZE;
2142
2144
2145 char* buf;
2146
2147 while (true) {
2148 rc = brecv(xsink, "recv", buf, bs, 0, timeout, false);
2149
2150 if (rc <= 0) {
2151 if (*xsink) {
2152 xsink->appendLastDescription(" (%zd bytes requested; %zu bytes received)", bs, str->size());
2153 }
2154 printd(5, "qore_socket_private::recv(" QSD ", %d) bs=" QSD ", br=" QSD ", rc=" QSD ", errno: %d "
2155 "(%s)\n", bufsize, timeout, bs, str->size(), rc, errno, strerror(errno));
2156 break;
2157 }
2158
2159 str->concat(buf, rc);
2160
2161 // register event
2162 if (source > 0) {
2163 do_read_event(rc, str->size(), bufsize, source);
2164 }
2165
2166 if (bufsize > 0) {
2167 if (str->size() >= (size_t)bufsize)
2168 break;
2169 if ((bufsize - str->size()) < bs)
2170 bs = bufsize - str->size();
2171 }
2172 }
2173
2174 printd(5, "qore_socket_private::recv() received " QSD " byte(s), bufsize=" QSD ", strlen=" QSD " str='%s'\n",
2175 str->size(), bufsize, (str ? str->strlen() : 0), str ? str->c_str() : "n/a");
2176
2177 // "fix" return code value if no error occurred
2178 if (rc >= 0)
2179 rc = str->size();
2180
2181 th.finalize(str->size());
2182
2183 if (*xsink) {
2184 return nullptr;
2185 }
2186
2187 if (source > 0) {
2188 do_data_event(QORE_EVENT_SOCKET_DATA_READ, source, **str);
2189 }
2190 return str.release();
2191 }
2192
2193 DLLLOCAL QoreStringNode* recvAll(ExceptionSink* xsink, int timeout, ssize_t& rc,
2194 int source = QORE_SOURCE_SOCKET) {
2195 assert(xsink);
2196 if (sock == QORE_INVALID_SOCKET) {
2197 se_not_open("Socket", "recv", xsink, "recvAll");
2198 rc = QSE_NOT_OPEN;
2199 return nullptr;
2200 }
2201 if (in_op >= 0) {
2202 if (in_op == q_gettid()) {
2203 se_in_op("Socket", "recv", xsink);
2204 return nullptr;
2205 }
2206 se_in_op_thread("Socket", "recv", xsink);
2207 return nullptr;
2208 }
2209
2210 PrivateQoreSocketThroughputHelper th(this, false);
2211
2213
2214 // perform first read with timeout
2215 char* buf;
2216 rc = brecv(xsink, "recv", buf, DEFAULT_SOCKET_BUFSIZE, 0, timeout, false);
2217 //printd(5, "qore_socket_private::brecv(to: %d) initial: rc=" QSD "\n", timeout, rc);
2218 if (rc <= 0) {
2219 return nullptr;
2220 }
2221
2222 str->concat(buf, rc);
2223
2224 // register event
2225 do_read_event(rc, rc);
2226
2227 // keep reading data until no more data is available without a timeout
2228 if (isDataAvailable(0, "recv", xsink)) {
2229 do {
2230 rc = brecv(xsink, "recv", buf, DEFAULT_SOCKET_BUFSIZE, 0, 0, false, true);
2231 //printd(5, "qore_socket_private::brecv(to: 0) rc=" QSD " rd=" QSD "\n", rc, str->size());
2232 // if the remote end has closed the connection, return what we have
2233 if (rc <= 0) {
2234 break;
2235 }
2236 str->concat(buf, rc);
2237
2238 // register event
2239 do_read_event(rc, str->size());
2240 } while (isDataAvailable(0, "recv", xsink));
2241 }
2242
2243 th.finalize(str->size());
2244
2245 if (*xsink) {
2246 return nullptr;
2247 }
2248
2249 rc = str->size();
2250 if (source > 0) {
2251 do_data_event(QORE_EVENT_SOCKET_DATA_READ, source, **str);
2252 }
2253 return str.release();
2254 }
2255
2256 DLLLOCAL int recv(int fd, ssize_t size, int timeout_ms, ExceptionSink* xsink);
2257
2258 DLLLOCAL BinaryNode* recvBinary(ExceptionSink* xsink, ssize_t bufsize, int timeout, ssize_t& rc,
2259 int source = QORE_SOURCE_SOCKET) {
2260 assert(xsink);
2261 if (sock == QORE_INVALID_SOCKET) {
2262 se_not_open("Socket", "recvBinary", xsink, "recvBinary");
2263 rc = QSE_NOT_OPEN;
2264 return 0;
2265 }
2266 if (in_op >= 0) {
2267 if (in_op == q_gettid()) {
2268 se_in_op("Socket", "recvBinary", xsink);
2269 return 0;
2270 }
2271 se_in_op_thread("Socket", "recvBinary", xsink);
2272 return 0;
2273 }
2274
2275 PrivateQoreSocketThroughputHelper th(this, false);
2276
2277 size_t bs = bufsize > 0 && bufsize < DEFAULT_SOCKET_BUFSIZE ? bufsize : DEFAULT_SOCKET_BUFSIZE;
2278
2280
2281 char* buf;
2282 while (true) {
2283 rc = brecv(xsink, "recvBinary", buf, bs, 0, timeout);
2284 if (rc <= 0)
2285 break;
2286
2287 b->append(buf, rc);
2288
2289 if (bufsize > 0) {
2290 if (b->size() >= (size_t)bufsize)
2291 break;
2292 if ((bufsize - b->size()) < bs)
2293 bs = bufsize - b->size();
2294 }
2295 }
2296
2297 th.finalize(b->size());
2298
2299 if (*xsink)
2300 return nullptr;
2301
2302 // "fix" return code value if no error occurred
2303 if (rc >= 0)
2304 rc = b->size();
2305
2306 if (source > 0) {
2307 do_data_event(QORE_EVENT_SOCKET_DATA_READ, source, **b);
2308 }
2309 printd(5, "qore_socket_private::recvBinary() received " QSD " byte(s), bufsize=" QSD ", blen=" QSD "\n",
2310 b->size(), bufsize, b->size());
2311 return b.release();
2312 }
2313
2314 DLLLOCAL BinaryNode* recvBinaryAll(ExceptionSink* xsink, int timeout, ssize_t& rc,
2315 int source = QORE_SOURCE_SOCKET) {
2316 assert(xsink);
2317 if (sock == QORE_INVALID_SOCKET) {
2318 se_not_open("Socket", "recvBinary", xsink, "recvBinaryAll");
2319 rc = QSE_NOT_OPEN;
2320 return 0;
2321 }
2322 if (in_op >= 0) {
2323 if (in_op == q_gettid()) {
2324 se_in_op("Socket", "recvBinary", xsink);
2325 return 0;
2326 }
2327 se_in_op_thread("Socket", "recvBinary", xsink);
2328 return 0;
2329 }
2330
2331 PrivateQoreSocketThroughputHelper th(this, false);
2332
2334
2335 //printd(5, "QoreSocket::recvBinary(%d, " QSD ") this: %p\n", timeout, rc, this);
2336 // perform first read with timeout
2337 char* buf;
2338 rc = brecv(xsink, "recvBinary", buf, DEFAULT_SOCKET_BUFSIZE, 0, timeout, false);
2339 //printd(5, "qore_socket_private::brecv(to: %d) initial: rc=" QSD "\n", timeout, rc);
2340 if (rc <= 0) {
2341 return nullptr;
2342 }
2343
2344 b->append(buf, rc);
2345
2346 // register event
2347 do_read_event(rc, rc);
2348
2349 // keep reading data until no more data is available without a timeout
2350 while (isDataAvailable(0, "recvBinary", xsink)) {
2351 rc = brecv(xsink, "recvBinary", buf, DEFAULT_SOCKET_BUFSIZE, 0, 0, false, true);
2352 //printd(5, "qore_socket_private::brecv(to: 0) rc=" QSD " rd=" QSD "\n", rc, b->size());
2353 // if the remote end has closed the connection, return what we have
2354 if (rc <= 0) {
2355 break;
2356 }
2357
2358 b->append(buf, rc);
2359
2360 // register event
2361 do_read_event(rc, b->size());
2362 }
2363
2364 th.finalize(b->size());
2365
2366 if (*xsink) {
2367 return nullptr;
2368 }
2369
2370 if (source > 0) {
2371 do_data_event(QORE_EVENT_SOCKET_DATA_READ, source, **b);
2372 }
2373 rc = b->size();
2374 //printd(5, "qore_socket_private() this: %p b: %p size: %lld\n", this, b->getPtr(), rc);
2375 return b.release();
2376 }
2377
2378 DLLLOCAL void recvToOutputStream(OutputStream *os, int64 size, int64 timeout, ExceptionSink *xsink,
2379 QoreThreadLock* l, int source = QORE_SOURCE_SOCKET) {
2380 if (sock == QORE_INVALID_SOCKET) {
2381 se_not_open("Socket", "recvToOutputStream", xsink);
2382 return;
2383 }
2384 if (in_op >= 0) {
2385 if (in_op == q_gettid()) {
2386 se_in_op("Socket", "recvToOutputStream", xsink);
2387 return;
2388 }
2389 se_in_op_thread("Socket", "recvToOutputStream", xsink);
2390 return;
2391 }
2392
2393 qore_socket_op_helper oh(this);
2394
2395 char* buf;
2396 ssize_t br = 0;
2397 while (size < 0 || br < size) {
2398 // calculate bytes needed
2399 int bn = size < 0 ? DEFAULT_SOCKET_BUFSIZE : QORE_MIN(size - br, DEFAULT_SOCKET_BUFSIZE);
2400
2401 ssize_t rc = brecv(xsink, "recvToOutputStream", buf, bn, 0, timeout);
2402 if (rc < 0) {
2403 //error - already reported in xsink
2404 return;
2405 }
2406 if (rc == 0) {
2407 //eof
2408 if (size >= 0) {
2409 //not all size bytes were read
2410 xsink->raiseException("SOCKET-RECV-ERROR", "Unexpected end of stream");
2411 }
2412 return;
2413 }
2414
2415 if (source > 0) {
2416 do_data_event(QORE_EVENT_SOCKET_DATA_READ, source, buf, rc);
2417 }
2418
2419 // write buffer to the stream
2420 {
2421 AutoUnlocker al(l);
2422 os->write(buf, rc, xsink);
2423 if (*xsink) {
2424 return;
2425 }
2426 }
2427
2428 br += rc;
2429 }
2430 }
2431
2432 DLLLOCAL QoreStringNode* readHTTPHeaderString(ExceptionSink* xsink, int timeout, int source) {
2433 assert(xsink);
2434 ssize_t rc;
2435 QoreStringNodeHolder hdr(readHTTPData(xsink, "readHTTPHeaderString", timeout, rc));
2436 if (!hdr) {
2437 assert(*xsink);
2438 return 0;
2439 }
2440 assert(rc > 0);
2441 do_data_event(QORE_EVENT_HTTP_HEADERS_READ, source, **hdr);
2442 return hdr.release();
2443 }
2444
2445 DLLLOCAL QoreHashNode* readHTTPHeader(ExceptionSink* xsink, QoreHashNode* info, int timeout,
2446 ssize_t& rc, int source, const char* headers_raw_key = "headers-raw") {
2447 assert(xsink);
2448 QoreStringNodeHolder hdr(readHTTPData(xsink, "readHTTPHeader", timeout, rc));
2449 if (!hdr) {
2450 assert(*xsink);
2451 return nullptr;
2452 }
2453
2454 if (hdr->empty()) {
2455 xsink->raiseException("SOCKET-HTTP-ERROR", "remote closed the connection while reading the HTTP header");
2456 return nullptr;
2457 }
2458 assert(rc > 0);
2459
2460 return processHttpHeaderString(xsink, hdr, info, source, headers_raw_key);
2461 }
2462
2464 DLLLOCAL QoreHashNode* processHttpHeaderString(ExceptionSink* xsink, QoreStringNodeHolder& hdr,
2465 QoreHashNode* info, int source, const char* headers_raw_key = "headers-raw") {
2466 const char* buf = hdr->c_str();
2467 char* p;
2468 if ((p = (char*)strstr(buf, "\r\n"))) {
2469 *p = '\0';
2470 p += 2;
2471 } else if ((p = (char*)strchr(buf, '\n'))) {
2472 *p = '\0';
2473 ++p;
2474 } else if ((p = (char*)strchr(buf, '\r'))) {
2475 *p = '\0';
2476 ++p;
2477 } else {
2478 // readHTTPData will only return a string that satisifies one of the above conditions,
2479 // however an embedded 0 could have been sent which would make the above searches invalid
2480 xsink->raiseException("SOCKET-HTTP-ERROR", "invalid header received with embedded nulls in "
2481 "Socket::readHTTPHeader()");
2482 return nullptr;
2483 }
2484
2485 char* t1;
2486 if (!(t1 = (char*)strstr(buf, "HTTP/"))) {
2487 xsink->raiseExceptionArg("SOCKET-HTTP-ERROR", hdr.release(), "missing HTTP version string in "
2488 "first header line in Socket::readHTTPHeader()");
2489 return nullptr;
2490 }
2491
2492 ReferenceHolder<QoreHashNode> h(new QoreHashNode(autoTypeInfo), xsink);
2493
2494 // process header flags
2495 int flags = CHF_PROCESS;
2496
2497 // get version
2498 {
2499 QoreStringNode* hv = new QoreStringNode(t1 + 5, 3, enc);
2500 h->setKeyValue("http_version", hv, nullptr);
2501 if (*hv == "1.1") {
2502 flags |= CHF_HTTP11;
2503 }
2504 }
2505
2506 // if we are getting a response
2507 // key for info if applicable
2508 const char* info_key;
2509 if (t1 == buf) {
2510 char* t2 = (char*)strchr(buf + 8, ' ');
2511 if (t2) {
2512 t2++;
2513 if (isdigit(*(t2))) {
2514 h->setKeyValue("status_code", atoi(t2), nullptr);
2515 if (strlen(t2) > 4) {
2516 h->setKeyValue("status_message", new QoreStringNode(t2 + 4), nullptr);
2517 }
2518 }
2519 }
2520 // write the status line as the "response-uri" key in the info hash if present
2521 // NOTE: this is not a URI, so the name is not really appropriate
2522 info_key = "response-uri";
2523 } else { // get method and path
2524 char* t2 = (char*)strchr(buf, ' ');
2525 if (t2) {
2526 *t2 = '\0';
2527 h->setKeyValue("method", new QoreStringNode(buf), nullptr);
2528 t2++;
2529 t1 = strchr(t2, ' ');
2530 if (t1) {
2531 *t1 = '\0';
2532 //printd(5, "found path '%s'\n", t2);
2533 // the path is returned as-is with no decodings - use decode_url() to decode
2534 h->setKeyValue("path", new QoreStringNode(t2, enc), nullptr);
2535 }
2536 }
2537 info_key = "request-uri";
2538 flags |= CHF_REQUEST;
2539 }
2540
2541 // write status line or request line to the info hash and raise a data event if applicable
2542 if (info || (event_queue && event_data)) {
2543 QoreStringNodeHolder status_line(new QoreStringNode(buf));
2544 if (info && event_queue && event_data) {
2545 status_line->ref();
2546 }
2547 if (event_queue && event_data) {
2548 do_data_event_intern(QORE_EVENT_SOCKET_DATA_READ, source, **status_line);
2549 }
2550 if (info) {
2551 info->setKeyValue(info_key, *status_line, nullptr);
2552 }
2553 status_line.release();
2554 }
2555
2556 bool close = convertHeaderToHash(*h, p, flags, info, &http_exp_chunked_body, headers_raw_key);
2557 do_read_http_header(QORE_EVENT_HTTP_MESSAGE_RECEIVED, *h, source);
2558
2559 // process header info
2560 if ((flags & CHF_REQUEST) && info) {
2561 info->setKeyValue("close", close, 0);
2562 }
2563
2564 return h.release();
2565 }
2566
2567 // info must be already referenced for the assignment, if present
2568 DLLLOCAL int runHeaderCallback(ExceptionSink* xsink, const char* cname, const char* mname,
2569 const ResolvedCallReferenceNode& callback, QoreThreadLock* l, const QoreHashNode* hdr, QoreHashNode* info,
2570 bool send_aborted = false, QoreObject* obj = nullptr) {
2571 assert(xsink);
2572 assert(obj);
2573 ReferenceHolder<QoreListNode> args(new QoreListNode(autoTypeInfo), xsink);
2574 QoreHashNode* arg = new QoreHashNode(autoTypeInfo);
2575 arg->setKeyValue("hdr", hdr ? hdr->refSelf() : nullptr, xsink);
2576 arg->setKeyValue("info", info, xsink);
2577 if (obj)
2578 arg->setKeyValue("obj", obj->refSelf(), xsink);
2579 arg->setKeyValue("send_aborted", send_aborted, xsink);
2580 args->push(arg, nullptr);
2581
2582 ValueHolder rv(xsink);
2583 return runCallback(xsink, cname, mname, rv, callback, l, *args);
2584 }
2585
2586 DLLLOCAL int runTrailerCallback(ExceptionSink* xsink, const char* cname, const char* mname,
2588 ValueHolder rv(xsink);
2589 if (runCallback(xsink, cname, mname, rv, callback, l, nullptr))
2590 return -1;
2591
2592 switch (rv->getType()) {
2593 case NT_NOTHING:
2594 break;
2595 case NT_HASH: {
2596 hdr = rv.release().get<QoreHashNode>();
2597 break;
2598 }
2599 default:
2600 xsink->raiseException("HTTP-TRAILER-ERROR", "chunked callback returned type '%s'; expecting 'hash' "
2601 "or 'NOTHING'", rv->getTypeName());
2602 return -1;
2603 }
2604 return 0;
2605 }
2606
2607 DLLLOCAL int runDataCallback(ExceptionSink* xsink, const char* cname, const char* mname,
2608 const ResolvedCallReferenceNode& callback, QoreThreadLock* l, const AbstractQoreNode* data, bool chunked) {
2609 assert(xsink);
2610 ReferenceHolder<QoreListNode> args(new QoreListNode(autoTypeInfo), xsink);
2611 QoreHashNode* arg = new QoreHashNode(autoTypeInfo);
2612 arg->setKeyValue("data", data->realCopy(), xsink);
2613 arg->setKeyValue("chunked", chunked, xsink);
2614 args->push(arg, nullptr);
2615
2616 ValueHolder rv(xsink);
2617 return runCallback(xsink, cname, mname, rv, callback, l, *args);
2618 }
2619
2620 DLLLOCAL int runCallback(ExceptionSink* xsink, const char* cname, const char* mname, ValueHolder& res,
2621 const ResolvedCallReferenceNode& callback, QoreThreadLock* l, const QoreListNode* args = nullptr) {
2622 assert(xsink);
2623 // FIXME: subtract callback execution time from socket performance measurement
2624
2625 // unlock and execute callback
2626 {
2627 AutoUnlocker al(l);
2628 res = callback.execValue(args, xsink);
2629 }
2630
2631 // check exception and socket status
2632 assert(xsink);
2633 return *xsink ? -1 : 0;
2634 }
2635
2636 DLLLOCAL int sendHttpChunkedWithCallback(ExceptionSink* xsink, const char* cname, const char* mname,
2637 const ResolvedCallReferenceNode& send_callback, QoreThreadLock& l, int source, int timeout_ms = -1,
2638 bool* aborted = nullptr) {
2639 assert(xsink);
2640 assert(!aborted || !(*aborted));
2641
2642 if (sock == QORE_INVALID_SOCKET) {
2643 se_not_open(cname, mname, xsink, "sendHttpChunkedWithCallback");
2644 return QSE_NOT_OPEN;
2645 }
2646 if (in_op >= 0) {
2647 if (in_op == q_gettid()) {
2648 se_in_op(cname, mname, xsink);
2649 return 0;
2650 }
2651 se_in_op_thread(cname, mname, xsink);
2652 return 0;
2653 }
2654
2655 PrivateQoreSocketThroughputHelper th(this, true);
2656
2657 // set the non-blocking flag (for use with non-ssl connections)
2658 bool nb = (timeout_ms >= 0);
2659 // set non-blocking I/O (and restore on exit) if we have a timeout and a non-ssl connection
2660 OptionalNonBlockingHelper onbh(*this, !ssl && nb, xsink);
2661 if (*xsink)
2662 return -1;
2663
2664 qore_socket_op_helper oh(this);
2665
2666 ssize_t rc;
2667 int64 total = 0;
2668 bool done = false;
2669
2670 while (!done) {
2671 // if we have response data already, then we assume an error and abort
2672 if (aborted) {
2673 bool data_available = tryReadSocketData(mname, xsink);
2674 //printd(5, "qore_socket_private::sendHttpChunkedWithCallback() this: %p aborted: %p iDA: %d\n", this, aborted, data_available);
2675 if (data_available || *xsink) {
2676 *aborted = true;
2677 return *xsink ? -1 : 0;
2678 }
2679 }
2680
2681 // FIXME: subtract callback execution time from socket performance measurement
2682 ValueHolder res(xsink);
2683 rc = runCallback(xsink, cname, mname, res, send_callback, &l);
2684 if (rc)
2685 return rc;
2686
2687 //printd(5, "qore_socket_private::sendHttpChunkedWithCallback() this: %p res: %s\n", this, get_type_name(*res));
2688
2689 // check callback return val
2690 QoreString buf;
2691 // do not copy data here; set references and send the data directly
2692 const char* data_ptr = nullptr;
2693 size_t data_size = 0;
2694
2695 switch (res->getType()) {
2696 case NT_STRING: {
2697 const QoreStringNode* str = res->get<const QoreStringNode>();
2698 if (str->empty()) {
2699 done = true;
2700 break;
2701 }
2702 buf.sprintf("%x\r\n", (int)str->size());
2703 data_ptr = str->c_str();
2704 data_size = str->size();
2705 //buf.concat(str->c_str(), str->size());
2706 break;
2707 }
2708
2709 case NT_BINARY: {
2710 const BinaryNode* b = res->get<const BinaryNode>();
2711 if (b->empty()) {
2712 done = true;
2713 break;
2714 }
2715 buf.sprintf("%x\r\n", (int)b->size());
2716 data_ptr = static_cast<const char*>(b->getPtr());
2717 data_size = b->size();
2718 //buf.concat((const char*)b->getPtr(), b->size());
2719 break;
2720 }
2721
2722 case NT_HASH: {
2723 buf.concat("0\r\n");
2724
2725 const QoreHashNode* h = res->get<const QoreHashNode>();
2726 ConstHashIterator hi(h);
2727 while (hi.next()) {
2728 const QoreValue v = hi.get();
2729 const char* key = hi.getKey();
2730
2731 //printd(5, "qore_socket_private::sendHttpChunkedWithCallback() this: %p trailer %s\n", this, key);
2732
2733 if (v.getType() == NT_LIST) {
2734 ConstListIterator li(v.get<const QoreListNode>());
2735 while (li.next())
2736 do_header(key, buf, li.getValue());
2737 } else
2738 do_header(key, buf, v);
2739 }
2740 // fall through to next case
2741 }
2742
2743 case NT_NOTHING:
2744 case NT_NULL:
2745 done = true;
2746 break;
2747
2748 default:
2749 xsink->raiseException("SOCKET-CALLBACK-ERROR", "HTTP chunked data callback returned type '%s'; expecting one of: 'string', 'binary', 'hash', 'nothing' (or 'NULL')", res->getTypeName());
2750 return -1;
2751 }
2752
2753 // send chunk buffer data
2754 if (!buf.empty()) {
2755 rc = sendIntern(xsink, cname, mname, buf.c_str(), buf.size(), timeout_ms, total, true);
2756 }
2757
2758 if (!*xsink) {
2759 assert(rc >= 0);
2760 // send actual data, if available
2761 if (data_ptr && data_size) {
2762 rc = sendIntern(xsink, cname, mname, data_ptr, data_size, timeout_ms, total, true);
2763 }
2764
2765 if (!*xsink) {
2766 assert(rc >= 0);
2767 if (buf.empty() && (!data_ptr || !data_size)) {
2768 buf.set("0\r\n\r\n");
2769 } else {
2770 buf.set("\r\n");
2771 }
2772 rc = sendIntern(xsink, cname, mname, buf.c_str(), buf.size(), timeout_ms, total, true);
2773 }
2774 }
2775
2776 if (!*xsink) {
2777 // do events
2778 switch (res->getType()) {
2779 case NT_STRING: {
2780 const QoreStringNode* str = res->get<const QoreStringNode>();
2781 if (!str->empty()) {
2782 do_data_event(QORE_EVENT_HTTP_CHUNKED_DATA_SENT, source, *str);
2783 }
2784 break;
2785 }
2786
2787 case NT_BINARY: {
2788 const BinaryNode* b = res->get<const BinaryNode>();
2789 if (!b->empty()) {
2790 do_data_event(QORE_EVENT_HTTP_CHUNKED_DATA_SENT, source, *b);
2791 }
2792 break;
2793 }
2794
2795 case NT_HASH: {
2796 const QoreHashNode* h = res->get<const QoreHashNode>();
2797 do_header_event(QORE_EVENT_HTTP_FOOTERS_SENT, source, *h);
2798 break;
2799 }
2800 }
2801 }
2802
2803 if (rc < 0) {
2804 // if we have a socket I/O error, but also data to be read on the socket, then clear the exception and return 0
2805 if (aborted && *xsink) {
2806 bool data_available = tryReadSocketData(mname, xsink);
2807 //printd(5, "qore_socket_private::sendHttpChunkedWithCallback() this: %p aborted: %p iDA: %d\n", this, aborted, data_available);
2808 if (data_available) {
2809 *aborted = true;
2810 return *xsink ? -1 : 0;
2811 }
2812 }
2813
2814 //printd(5, "qore_socket_private::sendHttpChunkedWithCallback() this: %p rc: %d sock: %d xsink: %d\n", this, rc, sock, xsink->isException());
2815 }
2816
2817 //printd(5, "qore_socket_private::sendHttpChunkedWithCallback() this: %p sent: %s\n", this, buf.c_str());
2818
2819 if (rc < 0 || sock == QORE_INVALID_SOCKET)
2820 break;
2821 }
2822
2823 th.finalize(total);
2824
2825 return rc < 0 || sock == QORE_INVALID_SOCKET ? -1 : 0;
2826 }
2827
2828 // return: the number of bytes sent or an error (< 0)
2829 DLLLOCAL ssize_t sendIntern(ExceptionSink* xsink, const char* cname, const char* mname, const char* buf, size_t size,
2830 int timeout_ms, int64& total, bool stream = false) {
2831 assert(xsink);
2832 ssize_t rc;
2833 size_t bs = 0;
2834
2835 // set the non-blocking flag (for use with non-ssl connections)
2836 bool nb = (timeout_ms >= 0);
2837
2838 while (true) {
2839 if (ssl) {
2840 // SSL_MODE_ENABLE_PARTIAL_WRITE is enabled so we can get finer-grained socket events for do_send_event() below
2841 rc = ssl->write(mname, buf + bs, size - bs, timeout_ms, xsink);
2842 } else {
2843 while (true) {
2844 rc = ::send(sock, buf + bs, size - bs, 0);
2845 //printd(5, "qore_socket_private::send() this: %p Socket::%s() buf: %p size: %zu timeout_ms: %d ssl: %p nb: %d bs: %zu" rc: %zd\n", this, mname, buf, size, timeout_ms, ssl, nb, bs, rc);
2846 // try again if we were interrupted by a signal
2847 if (rc >= 0)
2848 break;
2849 sock_get_error();
2850 // check that the send finishes before the timeout if we are using non-blocking I/O
2851 if (nb && (errno == EAGAIN
2852#ifdef EWOULDBLOCK
2853 || errno == EWOULDBLOCK
2854#endif
2855 )) {
2856 if (!isWriteFinished(timeout_ms, mname, xsink)) {
2857 if (*xsink)
2858 return -1;
2859 se_timeout("Socket", mname, timeout_ms, xsink);
2860 rc = QSE_TIMEOUT;
2861 break;
2862 }
2863 continue;
2864 }
2865 if (errno != EINTR) {
2866 //printd(5, "qore_socket_private::send() bs: %ld rc: " QSD " len: " QSD " (total: " QSD ") errno: %d sock: %d\n", bs, rc, size - bs, size, errno, sock);
2867 xsink->raiseErrnoException("SOCKET-SEND-ERROR", errno, "error while executing %s::%s()", cname, mname);
2868
2869 // do not close the socket even if we have EPIPE or ECONNRESET in case there is data to be read when streaming
2870#ifdef EPIPE
2871 if (!stream && errno == EPIPE) {
2872 close();
2873 }
2874#endif
2875#ifdef ECONNRESET
2876 if (!stream && errno == ECONNRESET) {
2877 close();
2878 }
2879#endif
2880 break;
2881 }
2882 }
2883 }
2884
2885 if (rc > 0) {
2886 total += rc;
2887 }
2888 //printd(5, "qore_socket_private::send() bs: %ld rc: " QSD " len: " QSD " (total: " QSD ") errno: %d\n", bs, rc, size - bs, size, errno);
2889 if (rc < 0 || sock == QORE_INVALID_SOCKET) {
2890 break;
2891 }
2892 bs += rc;
2893 do_send_event(rc, bs, size);
2894 if (bs >= size) {
2895 break;
2896 }
2897 }
2898
2899 return (ssize_t)bs;
2900 }
2901
2902 DLLLOCAL int send(int fd, ssize_t size, int timeout_ms, ExceptionSink* xsink);
2903
2904 // returns: 0 = OK
2905 DLLLOCAL int send(ExceptionSink* xsink, const char* cname, const char* mname, const char* buf, size_t size,
2906 int timeout_ms = -1, int source = QORE_SOURCE_SOCKET) {
2907 assert(xsink);
2908 if (sock == QORE_INVALID_SOCKET) {
2909 se_not_open(cname, mname, xsink, "send");
2910 return QSE_NOT_OPEN;
2911 }
2912 if (in_op >= 0) {
2913 if (in_op == q_gettid()) {
2914 se_in_op(cname, mname, xsink);
2915 return 0;
2916 }
2917 se_in_op_thread(cname, mname, xsink);
2918 return 0;
2919 }
2920 if (!size) {
2921 return 0;
2922 }
2923
2924 PrivateQoreSocketThroughputHelper th(this, true);
2925
2926 // set the non-blocking flag (for use with non-ssl connections)
2927 bool nb = (timeout_ms >= 0);
2928 // set non-blocking I/O (and restore on exit) if we have a timeout and a non-ssl connection
2929 OptionalNonBlockingHelper onbh(*this, !ssl && nb, xsink);
2930 if (*xsink) {
2931 return -1;
2932 }
2933
2934 int64 total = 0;
2935 ssize_t rc = sendIntern(xsink, cname, mname, buf, size, timeout_ms, total);
2936 th.finalize(total);
2937
2938 if (rc > 0 && source > 0) {
2939 do_data_event(QORE_EVENT_SOCKET_DATA_SENT, source, buf, size);
2940 }
2941
2942 return rc < 0 || sock == QORE_INVALID_SOCKET ? rc : 0;
2943 }
2944
2945 DLLLOCAL void sendFromInputStream(InputStream *is, int64 size, int64 timeout, ExceptionSink *xsink,
2946 QoreThreadLock* l) {
2947 if (sock == QORE_INVALID_SOCKET) {
2948 se_not_open("Socket", "sendFromInputStream", xsink);
2949 return;
2950 }
2951 if (in_op >= 0) {
2952 if (in_op == q_gettid()) {
2953 se_in_op("Socket", "sendFromInputStream", xsink);
2954 return;
2955 }
2956 se_in_op_thread("Socket", "sendFromInputStream", xsink);
2957 return;
2958 }
2959
2960 qore_socket_op_helper oh(this);
2961
2962 PrivateQoreSocketThroughputHelper th(this, true);
2963
2964 // set the non-blocking flag (for use with non-ssl connections)
2965 bool nb = (timeout >= 0);
2966 // set non-blocking I/O (and restore on exit) if we have a timeout and a non-ssl connection
2967 OptionalNonBlockingHelper onbh(*this, !ssl && nb, xsink);
2968 if (*xsink)
2969 return;
2970
2971 char buf[DEFAULT_SOCKET_BUFSIZE];
2972 int64 sent = 0;
2973 int64 total = 0;
2974 while (size < 0 || sent < size) {
2975 int64 toRead = size < 0 ? DEFAULT_SOCKET_BUFSIZE : QORE_MIN(size - sent, DEFAULT_SOCKET_BUFSIZE);
2976 int64 r;
2977 {
2978 AutoUnlocker al(l);
2979 r = is->read(buf, toRead, xsink);
2980 if (*xsink) {
2981 return;
2982 }
2983 }
2984 if (r == 0) {
2985 //eof
2986 if (size >= 0) {
2987 //not all size bytes were sent
2988 xsink->raiseException("SOCKET-SEND-ERROR", "Unexpected end of stream");
2989 return;
2990 }
2991 break;
2992 }
2993
2994 ssize_t rc = sendIntern(xsink, "Socket", "sendFromInputStream", buf, r, timeout, total);
2995 if (rc < 0) {
2996 return;
2997 }
2998 do_data_event(QORE_EVENT_SOCKET_DATA_SENT, QORE_SOURCE_SOCKET, buf, r);
2999 sent += r;
3000 }
3001 th.finalize(total);
3002 }
3003
3004 DLLLOCAL void sendHttpChunkedBodyFromInputStream(InputStream* is, size_t max_chunk_size, int timeout, ExceptionSink* xsink, QoreThreadLock* l, const ResolvedCallReferenceNode* trailer_callback) {
3005 if (sock == QORE_INVALID_SOCKET) {
3006 se_not_open("Socket", "sendHttpChunkedBodyFromInputStream", xsink);
3007 return;
3008 }
3009 if (in_op >= 0) {
3010 if (in_op == q_gettid()) {
3011 se_in_op("Socket", "sendHttpChunkedBodyFromInputStream", xsink);
3012 return;
3013 }
3014 se_in_op_thread("Socket", "sendHttpChunkedBodyFromInputStream", xsink);
3015 return;
3016 }
3017
3018 qore_socket_op_helper oh(this);
3019
3020 PrivateQoreSocketThroughputHelper th(this, true);
3021
3022 // set the non-blocking flag (for use with non-ssl connections)
3023 bool nb = (timeout >= 0);
3024 // set non-blocking I/O (and restore on exit) if we have a timeout and a non-ssl connection
3025 OptionalNonBlockingHelper onbh(*this, !ssl && nb, xsink);
3026 if (*xsink)
3027 return;
3028
3030 // reserve enough space for the maximum size of the buffer + HTTP overhead
3031 buf->preallocate(max_chunk_size);
3032 int64 total = 0;
3033 while (true) {
3034 int64 r;
3035 {
3036 AutoUnlocker al(l);
3037 r = is->read((void*)buf->getPtr(), sizeof(max_chunk_size), xsink);
3038 if (*xsink)
3039 return;
3040 }
3041
3042 // send HTTP chunk prelude with chunk size
3043 QoreString str;
3044 str.sprintf("%x\r\n", (int)r);
3045 int rc = sendIntern(xsink, "Socket", "sendHttpChunkedBodyFromInputStream", str.c_str(), str.size(), timeout, total, true);
3046 if (rc < 0)
3047 return;
3048
3049 bool trailers = false;
3050
3051 // send chunk data, if any
3052 if (r) {
3053 rc = sendIntern(xsink, "Socket", "sendHttpChunkedBodyFromInputStream", (const char*)buf->getPtr(), r, timeout, total, true);
3054 if (rc < 0)
3055 return;
3056 do_data_event(QORE_EVENT_HTTP_CHUNKED_DATA_SENT, QORE_SOURCE_SOCKET, buf->getPtr(), r);
3057 } else if (trailer_callback) {
3058 // get and send chunk trailers, if any
3060
3061 if (runTrailerCallback(xsink, "Socket", "sendHttpChunkedBodyFromInputStream", *trailer_callback, l, h))
3062 return;
3063 if (h) {
3064 str.clear();
3065 do_headers(str, *h, 0, false);
3066
3067 rc = sendIntern(xsink, "Socket", "sendHttpChunkedBodyFromInputStream", str.c_str(), str.size(), timeout, total, true);
3068 if (rc < 0)
3069 return;
3070
3071 do_header_event(QORE_EVENT_HTTP_FOOTERS_SENT, QORE_SOURCE_SOCKET, **h);
3072 trailers = true;
3073 }
3074 }
3075
3076 // close chunk if we sent no trailers
3077 if (!trailers) {
3078 str.set("\r\n");
3079 rc = sendIntern(xsink, "Socket", "sendHttpChunkedBodyFromInputStream", str.c_str(), str.size(), timeout, total, true);
3080 if (rc < 0)
3081 return;
3082 }
3083
3084 if (!r) {
3085 // end of stream
3086 break;
3087 }
3088 }
3089 th.finalize(total);
3090 }
3091
3092 DLLLOCAL void sendHttpChunkedBodyTrailer(const QoreHashNode* headers, int timeout, ExceptionSink* xsink) {
3093 if (sock == QORE_INVALID_SOCKET) {
3094 se_not_open("Socket", "sendHttpChunkedBodyTrailer", xsink);
3095 return;
3096 }
3097 if (in_op >= 0) {
3098 if (in_op == q_gettid()) {
3099 se_in_op("Socket", "sendHttpChunkedBodyTrailer", xsink);
3100 return;
3101 }
3102 se_in_op_thread("Socket", "sendHttpChunkedBodyTrailer", xsink);
3103 return;
3104 }
3105
3106 QoreString buf;
3107 if (!headers) {
3108 ConstHashIterator hi(headers);
3109
3110 while (hi.next()) {
3111 const QoreValue v = hi.get();
3112 const char* key = hi.getKey();
3113
3114 if (v.getType() == NT_LIST) {
3115 ConstListIterator li(v.get<const QoreListNode>());
3116 while (li.next())
3117 do_header(key, buf, li.getValue());
3118 }
3119 else
3120 do_header(key, buf, v);
3121 }
3122 }
3123 buf.concat("\r\n");
3124 int64 total;
3125 sendIntern(xsink, "Socket", "sendHttpChunkedBodyTrailer", buf.c_str(), buf.size(), timeout, total, true);
3126 if (!*xsink) {
3127 do_header_event(QORE_EVENT_HTTP_FOOTERS_SENT, QORE_SOURCE_SOCKET, *headers);
3128 }
3129 }
3130
3131 DLLLOCAL void getSendHttpMessageHeaders(QoreString& hdr, QoreHashNode* info, const char* method, const char* path,
3132 const char* http_version, const QoreHashNode* headers, size_t size, int source) {
3133 // prepare header string
3134 hdr.sprintf("%s %s HTTP/%s", method, path && path[0] ? path : "/", http_version);
3135
3136 // write request-uri key if info hash is non-null
3137 if (info) {
3138 info->setKeyValue("request-uri", new QoreStringNode(hdr), nullptr);
3139 }
3140
3141 getSendHttpMessageHeadersCommon(hdr, info, headers, size, source);
3142 }
3143
3144 DLLLOCAL void getSendHttpMessageHeadersCommon(QoreString& hdr, QoreHashNode* info, const QoreHashNode* headers,
3145 size_t size, int source) {
3146 // send event
3147 do_send_http_message_event(hdr, headers, source);
3148
3149 // add headers
3150 hdr.concat("\r\n");
3151 // insert headers
3152 do_headers(hdr, headers, size);
3153 }
3154
3155 DLLLOCAL int sendHttpMessage(ExceptionSink* xsink, QoreHashNode* info, const char* cname, const char* mname,
3156 const char* method, const char* path, const char* http_version, const QoreHashNode* headers,
3157 const QoreStringNode* body, const void* data, size_t size,
3158 const ResolvedCallReferenceNode* send_callback, InputStream* input_stream, size_t max_chunk_size,
3159 const ResolvedCallReferenceNode* trailer_callback, int source, int timeout_ms = -1,
3160 QoreThreadLock* l = nullptr, bool* aborted = nullptr) {
3161 // prepare header string
3162 QoreString hdr(enc);
3163
3164 hdr.sprintf("%s %s HTTP/%s", method, path && path[0] ? path : "/", http_version);
3165
3166 // write request-uri key if info hash is non-null
3167 if (info) {
3168 info->setKeyValue("request-uri", new QoreStringNode(hdr), nullptr);
3169 }
3170
3171 return sendHttpMessageCommon(xsink, hdr, info, cname, mname, headers, body, data, size, send_callback,
3172 input_stream, max_chunk_size, trailer_callback, source, timeout_ms, l, aborted);
3173 }
3174
3175 DLLLOCAL int sendHttpResponse(ExceptionSink* xsink, QoreHashNode* info, const char* cname, const char* mname,
3176 int code, const char* desc, const char* http_version, const QoreHashNode* headers, const QoreStringNode* body,
3177 const void* data, size_t size, const ResolvedCallReferenceNode* send_callback, InputStream* input_stream,
3178 size_t max_chunk_size, const ResolvedCallReferenceNode* trailer_callback, int source, int timeout_ms = -1,
3179 QoreThreadLock* l = nullptr, bool* aborted = nullptr) {
3180 // prepare header string
3181 QoreString hdr(enc);
3182
3183 // write HTTP response status line
3184 hdr.sprintf("HTTP/%s %03d %s", http_version, code, desc);
3185
3186 // write the status line as the "response-uri" key if info hash is non-null
3187 // NOTE: this is not a URI, so the name is not really appropriate
3188 if (info) {
3189 info->setKeyValue("response-uri", new QoreStringNode(hdr), nullptr);
3190 }
3191
3192 return sendHttpMessageCommon(xsink, hdr, info, cname, mname, headers, body, data, size, send_callback,
3193 input_stream, max_chunk_size, trailer_callback, source, timeout_ms, l, aborted);
3194 }
3195
3196 DLLLOCAL int sendHttpMessageCommon(ExceptionSink* xsink, QoreString& hdr, QoreHashNode* info, const char* cname,
3197 const char* mname, const QoreHashNode* headers, const QoreStringNode* body, const void* data,
3198 size_t size, const ResolvedCallReferenceNode* send_callback, InputStream* input_stream,
3199 size_t max_chunk_size, const ResolvedCallReferenceNode* trailer_callback, int source, int timeout_ms = -1,
3200 QoreThreadLock* l = nullptr, bool* aborted = nullptr) {
3201 assert(xsink);
3202 assert(!(data && send_callback));
3203 assert(!(data && input_stream));
3204 assert(!(send_callback && input_stream));
3205
3206 // send event
3207 do_send_http_message_event(hdr, headers, source);
3208
3209 // add headers
3210 hdr.concat("\r\n");
3211 // insert headers
3212 do_headers(hdr, headers, size && data ? size : 0);
3213
3214 //printd(5, "qore_socket_private::sendHttpMessage() hdr: %s\n", hdr.c_str());
3215
3216 // send URI and headers
3217 int rc;
3218 if ((rc = send(xsink, cname, mname, hdr.c_str(), hdr.size(), timeout_ms, -1)))
3219 return rc;
3220
3221 // header message sent above with do_sent_http_message_event()
3222 if (size && data) {
3223 int rc = send(xsink, cname, mname, (char*)data, size, timeout_ms, -1);
3224 if (!rc) {
3225 if (body) {
3226 do_data_event(QORE_EVENT_SOCKET_DATA_SENT, source, *body);
3227 } else {
3228 do_data_event(QORE_EVENT_SOCKET_DATA_SENT, source, data, size);
3229 }
3230 }
3231 return rc;
3232 } else if (send_callback) {
3233 assert(l);
3234 assert(!aborted || !(*aborted));
3235 return sendHttpChunkedWithCallback(xsink, cname, mname, *send_callback, *l, source, timeout_ms, aborted);
3236 } else if (input_stream) {
3237 assert(l);
3238 sendHttpChunkedBodyFromInputStream(input_stream, max_chunk_size, timeout_ms, xsink, l, trailer_callback);
3239 return *xsink ? -1 : 0;
3240 }
3241
3242 return 0;
3243 }
3244
3245 DLLLOCAL QoreHashNode* readHttpChunkedBodyBinary(int timeout, ExceptionSink* xsink, const char* cname, int source,
3246 const ResolvedCallReferenceNode* recv_callback = nullptr, QoreThreadLock* l = nullptr,
3247 QoreObject* obj = nullptr, OutputStream* os = nullptr, bool read_once = false) {
3248 assert(xsink);
3249
3250 if (sock == QORE_INVALID_SOCKET) {
3251 se_not_open(cname, "readHTTPChunkedBodyBinary", xsink);
3252 return 0;
3253 }
3254 if (in_op >= 0) {
3255 if (in_op == q_gettid()) {
3256 se_in_op(cname, "readHTTPChunkedBodyBinary", xsink);
3257 return 0;
3258 }
3259 se_in_op_thread(cname, "readHTTPChunkedBodyBinary", xsink);
3260 return 0;
3261 }
3262
3263 // reset "expecting HTTP chunked body" flag
3264 if (http_exp_chunked_body)
3265 http_exp_chunked_body = false;
3266
3267 qore_socket_op_helper oh(this);
3268
3269 SimpleRefHolder<BinaryNode> b(os ? nullptr : new BinaryNode);
3270 QoreString str; // for reading the size of each chunk
3271
3272 ssize_t rc;
3273 // read the size then read the data and append to buffer
3274 while (true) {
3275 // state = 0, nothing
3276 // state = 1, \r received
3277 int state = 0;
3278 while (true) {
3279 char* buf;
3280 rc = brecv(xsink, "readHTTPChunkedBodyBinary", buf, 1, 0, timeout, false);
3281 if (rc <= 0) {
3282 if (!*xsink) {
3283 assert(!rc);
3284 se_closed(cname, "readHTTPChunkedBodyBinary", xsink);
3285 }
3286 return 0;
3287 }
3288
3289 char c = buf[0];
3290
3291 if (!state && c == '\r')
3292 state = 1;
3293 else if (state && c == '\n')
3294 break;
3295 else {
3296 if (state) {
3297 state = 0;
3298 str.concat('\r');
3299 }
3300 str.concat(c);
3301 }
3302 }
3303 // DEBUG
3304 //printd(5, "QoreSocket::readHTTPChunkedBodyBinary(): got chunk size (" QSD " bytes) string: %s\n",
3305 // str.strlen(), str.c_str());
3306
3307 // terminate string at ';' char if present
3308 char* p = (char*)strchr(str.c_str(), ';');
3309 if (p)
3310 *p = '\0';
3311 long size = strtol(str.c_str(), 0, 16);
3312 do_chunked_read(QORE_EVENT_HTTP_CHUNK_SIZE, size, str.size(), source);
3313
3314 if (!size) {
3315 break;
3316 }
3317
3318 if (size < 0) {
3319 xsink->raiseException("READ-HTTP-CHUNK-ERROR", "negative value given for chunk size (%ld)", size);
3320 return 0;
3321 }
3322
3323 // prepare string for chunk
3324 //str.allocate(size + 1);
3325
3326 ssize_t bs = size < DEFAULT_SOCKET_BUFSIZE ? size : DEFAULT_SOCKET_BUFSIZE;
3327 ssize_t br = 0; // bytes received
3328 while (true) {
3329 char* buf;
3330 rc = brecv(xsink, "readHTTPChunkedBodyBinary", buf, bs, 0, timeout, false);
3331 //printd(5, "qore_socket_private::readHTTPChunkedBodyBinary() str: '%s' bs: %lld rc: %lld b: %p "
3332 // "(%lld) recv_callback: %p\n", str.c_str(), bs, rc, *b, b->size(), recv_callback);
3333 if (rc <= 0) {
3334 if (!*xsink) {
3335 assert(!rc);
3336 se_closed(cname, "readHTTPChunkedBodyBinary", xsink);
3337 }
3338 return nullptr;
3339 }
3340
3341 do_data_event(QORE_EVENT_HTTP_CHUNKED_DATA_READ, source, buf, (size_t)rc);
3342
3343 if (os) {
3344 AutoUnlocker al(l);
3345 os->write(buf, rc, xsink);
3346 if (*xsink)
3347 return nullptr;
3348 } else {
3349 b->append(buf, rc);
3350 }
3351 br += rc;
3352
3353 if (br >= size) {
3354 break;
3355 }
3356 if (size - br < bs) {
3357 bs = size - br;
3358 }
3359 }
3360
3361 // DEBUG
3362 //printd(5, "QoreSocket::readHTTPChunkedBodyBinary(): received binary chunk: size: %d br=" QSD " total=" QSD "\n", size, br, b->size());
3363
3364 // read crlf after chunk
3365 // FIXME: bytes read are not checked if they equal CRLF
3366 br = 0;
3367 while (br < 2) {
3368 char* buf;
3369 rc = brecv(xsink, "readHTTPChunkedBodyBinary", buf, 2 - br, 0, timeout, false);
3370 if (rc <= 0) {
3371 if (!*xsink) {
3372 assert(!rc);
3373 se_closed(cname, "readHTTPChunkedBodyBinary", xsink);
3374 }
3375 return nullptr;
3376 }
3377 br += rc;
3378 }
3379
3380 do_chunked_read(QORE_EVENT_HTTP_CHUNKED_DATA_RECEIVED, size, size + 2, source);
3381
3382 if (recv_callback && !os) {
3383 if (runDataCallback(xsink, cname, "readHTTPChunkedBodyBinary", *recv_callback, l, *b, true))
3384 return nullptr;
3385 if (b)
3386 b->clear();
3387 }
3388
3389 if (read_once) {
3390 assert(!recv_callback);
3391 assert(!os);
3392 ReferenceHolder<QoreHashNode> h(new QoreHashNode(autoTypeInfo), xsink);
3393 h->setKeyValue("body", b.release(), xsink);
3394 return h.release();
3395 }
3396
3397 // ensure string is blanked for next read
3398 str.clear();
3399 }
3400
3401 // read footers or nothing
3402 QoreStringNodeHolder hdr(readHTTPData(xsink, "readHTTPChunkedBodyBinary", timeout, rc, true));
3403 if (*xsink)
3404 return nullptr;
3405
3406 ReferenceHolder<QoreHashNode> h(new QoreHashNode(autoTypeInfo), xsink);
3407 if (!recv_callback && !os) {
3408 h->setKeyValue("body", b.release(), xsink);
3409 }
3410
3412
3413 if (hdr) {
3414 if (hdr->strlen() >= 2 && hdr->strlen() <= 4)
3415 return recv_callback ? 0 : h.release();
3416
3417 if (recv_callback) {
3418 info = new QoreHashNode(autoTypeInfo);
3419 }
3420 convertHeaderToHash(*h, (char*)hdr->c_str(), 0, *info, nullptr, "response-headers-raw");
3421 do_read_http_header(QORE_EVENT_HTTP_FOOTERS_RECEIVED, *h, source);
3422 }
3423
3424 if (recv_callback) {
3425 runHeaderCallback(xsink, cname, "readHTTPChunkedBodyBinary", *recv_callback, l, h->empty() ? nullptr : *h,
3426 info.release(), false, obj);
3427 return 0;
3428 }
3429
3430 return h.release();
3431 }
3432
3433 // receive a message in HTTP chunked format
3434 DLLLOCAL QoreHashNode* readHttpChunkedBody(int timeout, ExceptionSink* xsink, const char* cname, int source,
3435 const ResolvedCallReferenceNode* recv_callback = 0, QoreThreadLock* l = 0, QoreObject* obj = 0) {
3436 assert(xsink);
3437
3438 if (sock == QORE_INVALID_SOCKET) {
3439 se_not_open(cname, "readHTTPChunkedBody", xsink);
3440 return 0;
3441 }
3442 if (in_op >= 0) {
3443 if (in_op == q_gettid()) {
3444 se_in_op(cname, "readHTTPChunkedBody", xsink);
3445 return 0;
3446 }
3447 se_in_op_thread(cname, "readHTTPChunkedBody", xsink);
3448 return 0;
3449 }
3450
3451 // reset "expecting HTTP chunked body" flag
3452 if (http_exp_chunked_body)
3453 http_exp_chunked_body = false;
3454
3455 qore_socket_op_helper oh(this);
3456
3458 QoreString str; // for reading the size of each chunk
3459
3460 ssize_t rc;
3461 // read the size then read the data and append to buf
3462 while (true) {
3463 // state = 0, nothing
3464 // state = 1, \r received
3465 int state = 0;
3466 while (true) {
3467 char* tbuf;
3468 rc = brecv(xsink, "readHTTPChunkedBody", tbuf, 1, 0, timeout, false);
3469 if (rc <= 0) {
3470 if (!*xsink) {
3471 assert(!rc);
3472 se_closed(cname, "readHTTPChunkedBody", xsink);
3473 }
3474 return 0;
3475 }
3476
3477 char c = tbuf[0];
3478
3479 if (!state && c == '\r')
3480 state = 1;
3481 else if (state && c == '\n')
3482 break;
3483 else {
3484 if (state) {
3485 state = 0;
3486 str.concat('\r');
3487 }
3488 str.concat(c);
3489 }
3490 }
3491 // DEBUG
3492 //printd(5, "got chunk size (" QSD " bytes) string: %s\n", str.strlen(), str.c_str());
3493
3494 // terminate string at ';' char if present
3495 char* p = (char*)strchr(str.c_str(), ';');
3496 if (p)
3497 *p = '\0';
3498 ssize_t size = strtol(str.c_str(), 0, 16);
3499 do_chunked_read(QORE_EVENT_HTTP_CHUNK_SIZE, size, str.strlen(), source);
3500
3501 if (!size)
3502 break;
3503
3504 if (size < 0) {
3505 xsink->raiseException("READ-HTTP-CHUNK-ERROR", "negative value given for chunk size (%ld)", size);
3506 return 0;
3507 }
3508 // ensure string is blanked for next read
3509 str.clear();
3510
3511 // prepare string for chunk
3512 //buf->allocate((unsigned)(buf->strlen() + size + 1));
3513
3514 // read chunk directly into string buffer
3515 ssize_t bs = size < DEFAULT_SOCKET_BUFSIZE ? size : DEFAULT_SOCKET_BUFSIZE;
3516 ssize_t br = 0; // bytes received
3517 str.clear();
3518 while (true) {
3519 char* tbuf;
3520 rc = brecv(xsink, "readHTTPChunkedBody", tbuf, bs, 0, timeout, false);
3521 if (rc <= 0) {
3522 if (!*xsink) {
3523 assert(!rc);
3524 se_closed(cname, "readHTTPChunkedBody", xsink);
3525 }
3526 return 0;
3527 }
3528
3529 br += rc;
3530 buf->concat(tbuf, rc);
3531
3532 do_data_event(QORE_EVENT_HTTP_CHUNKED_DATA_READ, source, tbuf, (size_t)rc);
3533
3534 if (br >= size)
3535 break;
3536 if (size - br < bs)
3537 bs = size - br;
3538 }
3539
3540 // DEBUG
3541 //printd(5, "got chunk (" QSD " bytes): %s\n", br, buf->c_str() + buf->strlen() - size);
3542
3543 // read crlf after chunk
3544 // FIXME: bytes read are not checked if they equal CRLF
3545 br = 0;
3546 while (br < 2) {
3547 char* tbuf;
3548 rc = brecv(xsink, "readHTTPChunkedBody", tbuf, 2 - br, 0, timeout, false);
3549 if (rc <= 0) {
3550 if (!*xsink) {
3551 assert(!rc);
3552 se_closed(cname, "readHTTPChunkedBody", xsink);
3553 }
3554 return nullptr;
3555 }
3556 br += rc;
3557 }
3558
3559 do_chunked_read(QORE_EVENT_HTTP_CHUNKED_DATA_RECEIVED, size, size + 2, source);
3560
3561 if (recv_callback) {
3562 if (runDataCallback(xsink, cname, "readHTTPChunkedBody", *recv_callback, l, *buf, true))
3563 return nullptr;
3564 buf->clear();
3565 }
3566 }
3567
3568 // read footers or nothing
3569 QoreStringNodeHolder hdr(readHTTPData(xsink, "readHTTPChunkedBody", timeout, rc, true));
3570 if (*xsink)
3571 return nullptr;
3572
3573 //printd(5, "chunked body encoding: %s\n", buf->getEncoding()->getCode());
3574 ReferenceHolder<QoreHashNode> h(new QoreHashNode(autoTypeInfo), xsink);
3575 if (!recv_callback) {
3576 h->setKeyValue("body", buf.release(), xsink);
3577 }
3578
3580
3581 if (hdr) {
3582 if (hdr->strlen() >= 2 && hdr->strlen() <= 4)
3583 return recv_callback ? 0 : h.release();
3584
3585 if (recv_callback) {
3586 info = new QoreHashNode(autoTypeInfo);
3587 }
3588 convertHeaderToHash(*h, (char*)hdr->c_str(), 0, *info, nullptr, "response-headers-raw");
3589 do_read_http_header(QORE_EVENT_HTTP_FOOTERS_RECEIVED, *h, source);
3590 }
3591
3592 if (recv_callback) {
3593 runHeaderCallback(xsink, cname, "readHTTPChunkedBody", *recv_callback, l, h->empty() ? nullptr : *h,
3594 info.release(), false, obj);
3595 return 0;
3596 }
3597
3598 return h.release();
3599 }
3600
3601 DLLLOCAL QoreHashNode* readServerSentEvent(ExceptionSink* xsink, Transform* transform, int timeout_ms);
3602
3603 DLLLOCAL static QoreHashNode* parseServerSentEvent(ExceptionSink* xsink, const QoreString& buf);
3604
3605 DLLLOCAL static void do_accept_encoding(char* t, QoreHashNode& info) {
3606 ReferenceHolder<QoreListNode> l(new QoreListNode(autoTypeInfo), 0);
3607
3608 char* a = t;
3609 bool ok = true;
3610 while (*a) {
3611 if (ok) {
3612 ok = false;
3614 while (*a && *a != ';' && *a != ',')
3615 str->concat(*(a++));
3616 str->trim();
3617 if (!str->empty())
3618 l->push(str.release(), nullptr);
3619 continue;
3620 }
3621 else if (*a == ',')
3622 ok = true;
3623
3624 ++a;
3625 }
3626
3627 if (!l->empty())
3628 info.setKeyValue("accept-encoding", l.release(), 0);
3629 }
3630
3631 DLLLOCAL bool do_accept_charset(char* t, QoreHashNode& info) {
3632 bool acceptcharset = false;
3633
3634 // see if we have "*" or utf8 or utf-8, in which case set it
3635 // otherwise set the first charset in the list
3636 char* a = t;
3637 char* div = 0;
3638 bool utf8 = false;
3639 bool ok = true;
3640 while (*a) {
3641 if (ok) {
3642 if (*a == '*') {
3643 utf8 = true;
3644 break;
3645 }
3646 ok = false;
3647 if (*a == 'u' || *a == 'U') {
3648 ++a;
3649 if (*a == 't' || *a == 'T') {
3650 ++a;
3651 if (*a == 'f' || *a == 'F') {
3652 ++a;
3653 if (*a == '-')
3654 ++a;
3655 if (*a == '8') {
3656 utf8 = true;
3657 break;
3658 }
3659 }
3660 }
3661 continue;
3662 }
3663 } else if (*a == ',') {
3664 if (!div)
3665 div = a;
3666 ok = true;
3667 } else if (*a == ';') {
3668 if (!div)
3669 div = a;
3670 }
3671
3672 ++a;
3673 }
3674 if (utf8) {
3675 info.setKeyValue("accept-charset", new QoreStringNode("utf8"), 0);
3676 acceptcharset = true;
3677 } else {
3679 if (div)
3680 ac->concat(t, div - t);
3681 else
3682 ac->concat(t);
3683 ac->trim();
3684 if (!ac->empty()) {
3685 info.setKeyValue("accept-charset", ac.release(), 0);
3686 acceptcharset = true;
3687 }
3688 }
3689
3690 return acceptcharset;
3691 }
3692
3693 // returns true if the connection should be closed, false if not
3694 DLLLOCAL bool convertHeaderToHash(QoreHashNode* h, char* p, int flags = 0, QoreHashNode* info = nullptr,
3695 bool* chunked = nullptr, const char* headers_raw_key = "headers-raw") {
3696 bool close = !(flags & CHF_HTTP11);
3697 // socket encoding
3698 const char* senc = nullptr;
3699 // accept-charset
3700 bool acceptcharset = false;
3701
3702 QoreHashNode* raw_hdr = nullptr;
3703 if (info) {
3704 info->setKeyValue(headers_raw_key, raw_hdr = new QoreHashNode(autoTypeInfo), nullptr);
3705 }
3706
3707 // raw key for setting raw headers
3708 std::string raw_key;
3709
3710 while (*p) {
3711 char* buf = p;
3712
3713 if ((p = strstr(buf, "\r\n"))) {
3714 *p = '\0';
3715 p += 2;
3716 } else if ((p = strchr(buf, '\n'))) {
3717 *p = '\0';
3718 p++;
3719 } else if ((p = strchr(buf, '\r'))) {
3720 *p = '\0';
3721 p++;
3722 } else
3723 break;
3724 char* t = strchr(buf, ':');
3725 if (!t)
3726 break;
3727 *t = '\0';
3728 t++;
3729 while (t && qore_isblank(*t))
3730 t++;
3731 if (raw_hdr) {
3732 raw_key = buf;
3733 }
3734 strtolower(buf);
3735 //printd(5, "setting %s = '%s'\n", buf, t);
3736
3737 ReferenceHolder<> val(new QoreStringNode(t), nullptr);
3738
3739 if (flags & CHF_PROCESS) {
3740 if (!strcmp(buf, "connection")) {
3741 if (flags & CHF_HTTP11) {
3742 if (strcasestr(t, "close"))
3743 close = true;
3744 } else {
3745 if (strcasestr(t, "keep-alive"))
3746 close = false;
3747 }
3748 } else if (!strcmp(buf, "content-type")) {
3749 char* a = strcasestr(t, "charset=");
3750 if (a) {
3751 // find end
3752 char* e = strchr(a + 8, ';');
3753
3754 QoreString cs;
3755 if (e) {
3756 cs.concat(a + 8, e - a - 8);
3757 } else {
3758 cs.concat(a + 8);
3759 }
3760 cs.trim();
3761 senc = cs.c_str();
3762 //printd(5, "got encoding '%s' from request\n", senc);
3763 enc = QEM.findCreate(senc);
3764
3765 if (info) {
3766 size_t len = cs.size();
3767 info->setKeyValue("charset", new QoreStringNode(cs.giveBuffer(), len, len + 1,
3768 QCS_DEFAULT), nullptr);
3769 }
3770
3771 if (info) {
3773 // remove any whitespace and ';' before charset=
3774 if (a != t) {
3775 do {
3776 --a;
3777 } while (a > t && (*a == ' ' || *a == ';'));
3778 }
3779
3780 if (a == t) {
3781 if (e) {
3782 ct->concat(e + 1);
3783 }
3784 } else {
3785 ct->concat(t, a - t + 1);
3786 if (e) {
3787 ct->concat(e);
3788 }
3789 }
3790 ct->trim();
3791 if (!ct->empty()) {
3792 info->setKeyValue("body-content-type", ct.release(), nullptr);
3793 }
3794 }
3795 } else {
3796 // check for content types that imply UTF-8 encoding
3797 std::string ct = t;
3798 ct = ct.substr(0, ct.find(';'));
3799 if (utf8_content_type_set.find(ct) != utf8_content_type_set.end()) {
3800 senc = "UTF-8";
3801 enc = QCS_UTF8;
3802 } else {
3803 senc = assume_http_encoding.c_str();
3804 enc = QEM.findCreate(assume_http_encoding.c_str());
3805 }
3806 if (info) {
3807 info->setKeyValue("charset", new QoreStringNode(senc), nullptr);
3808 info->setKeyValue("body-content-type", val->refSelf(), nullptr);
3809 }
3810 }
3811 } else if (chunked && !strcmp(buf, "transfer-encoding") && !strcasecmp(t, "chunked")) {
3812 *chunked = true;
3813 } else if (info) {
3814 if (!strcmp(buf, "accept-charset")) {
3815 acceptcharset = do_accept_charset(t, *info);
3816 } else if ((flags & CHF_REQUEST) && !strcmp(buf, "accept-encoding")) {
3817 do_accept_encoding(t, *info);
3818 }
3819 }
3820 }
3821
3822 ReferenceHolder<> val_copy(nullptr);
3823 if (raw_hdr && val) {
3824 val_copy = val->realCopy();
3825 }
3826
3827 // see if header exists, and if so make it a list and add value to the list
3828 hash_assignment_priv ha(*h, buf);
3829 if (!(*ha).isNothing()) {
3830 QoreListNode* l;
3831 if ((*ha).getType() == NT_LIST) {
3832 l = (*ha).get<QoreListNode>();
3833 } else {
3834 l = new QoreListNode(autoTypeInfo);
3835 l->push(ha.swap(l), nullptr);
3836 }
3837 l->push(val.release(), nullptr);
3838 } else // otherwise set header normally
3839 ha.assign(val.release(), 0);
3840
3841 // set raw headers if applicable
3842 if (raw_hdr) {
3843 hash_assignment_priv ha(*raw_hdr, raw_key);
3844 if (!(*ha).isNothing()) {
3845 QoreListNode* l;
3846 if ((*ha).getType() == NT_LIST) {
3847 l = (*ha).get<QoreListNode>();
3848 } else {
3849 l = new QoreListNode(autoTypeInfo);
3850 l->push(ha.swap(l), nullptr);
3851 }
3852 l->push(val_copy.release(), nullptr);
3853 } else // otherwise set header normally
3854 ha.assign(val_copy.release(), nullptr);
3855 }
3856 }
3857
3858 if ((flags & CHF_PROCESS)) {
3859 if (!senc) {
3860 enc = QEM.findCreate(assume_http_encoding.c_str());
3861 }
3862 // according to RFC-2616 section 14.2, "If no Accept-Charset header is present, the default is that any
3863 // character set is acceptable" so we will use utf-8
3864 if (info && !acceptcharset) {
3865 info->setKeyValue("accept-charset", new QoreStringNode("utf8"), nullptr);
3866 }
3867 }
3868
3869 return close;
3870 }
3871
3872 DLLLOCAL int recvix(const char* meth, int len, void* targ, int timeout_ms, ExceptionSink* xsink) {
3873 assert(xsink);
3874 if (sock == QORE_INVALID_SOCKET) {
3875 se_not_open("Socket", meth, xsink, "recvix");
3876 return QSE_NOT_OPEN;
3877 }
3878 if (in_op >= 0) {
3879 if (in_op == q_gettid()) {
3880 se_in_op("Socket", meth, xsink);
3881 return 0;
3882 }
3883 se_in_op_thread("Socket", meth, xsink);
3884 return 0;
3885 }
3886
3887 PrivateQoreSocketThroughputHelper th(this, false);
3888
3889 char* buf;
3890 ssize_t br = 0;
3891 while (true) {
3892 ssize_t rc = brecv(xsink, meth, buf, len - br, 0, timeout_ms);
3893 if (rc <= 0) {
3894 do_read_error(rc, meth, timeout_ms, xsink);
3895 return (int)rc;
3896 }
3897
3898 memcpy(targ, buf, rc);
3899
3900 br += rc;
3901 if (br >= len)
3902 break;
3903 }
3904
3905 th.finalize(br);
3906 do_data_event(QORE_EVENT_SOCKET_DATA_READ, QORE_SOURCE_SOCKET, targ, br);
3907 return (int)br;
3908 }
3909
3910 DLLLOCAL void clearWarningQueue(ExceptionSink* xsink) {
3911 if (warn_queue) {
3912 if (warn_callback_arg) {
3913 warn_callback_arg.discard(xsink);
3914 warn_callback_arg = QoreValue();
3915 }
3916 warn_queue->deref(xsink);
3917 warn_queue = nullptr;
3918 tl_warning_us = 0;
3919 tp_warning_bs = 0.0;
3920 tp_us_min = 0;
3921 }
3922 }
3923
3924 DLLLOCAL void setWarningQueue(ExceptionSink* xsink, int64 warning_ms, int64 warning_bs, Queue* wq, QoreValue arg,
3925 int64 min_ms = 1000) {
3926 ReferenceHolder<Queue> qholder(wq, xsink);
3927 ValueHolder holder(arg, xsink);
3928 if (warning_ms <= 0 && warning_bs <= 0) {
3929 xsink->raiseException("SOCKET-SETWARNINGQUEUE-ERROR", "Socket::setWarningQueue() at least one of warning "
3930 "ms argument: " QLLD " and warning B/s argument: " QLLD " must be greater than zero; to clear, call "\
3931 "Socket::clearWarningQueue() with no arguments", warning_ms, warning_bs);
3932 return;
3933 }
3934
3935 if (warning_ms < 0)
3936 warning_ms = 0;
3937 if (warning_bs < 0)
3938 warning_bs = 0;
3939
3940 if (warn_queue) {
3941 warn_queue->deref(xsink);
3942 warn_callback_arg.discard(xsink);
3943 }
3944
3945 warn_queue = qholder.release();
3946 warn_callback_arg = holder.release();
3947 tl_warning_us = (int64)warning_ms * 1000;
3948 tp_warning_bs = warning_bs;
3949 tp_us_min = min_ms * 1000;
3950 }
3951
3952 DLLLOCAL void getUsageInfo(QoreHashNode& h, qore_socket_private& s) const {
3953 if (warn_queue) {
3954 h.setKeyValue("arg", warn_callback_arg.refSelf(), 0);
3955 h.setKeyValue("timeout", tl_warning_us, 0);
3956 h.setKeyValue("min_throughput", (int64)tp_warning_bs, 0);
3957 h.setKeyValue("min_throughput_us", (int64)tp_us_min, 0);
3958 }
3959
3960 h.setKeyValue("bytes_sent", tp_bytes_sent + s.tp_bytes_sent, 0);
3961 h.setKeyValue("bytes_recv", tp_bytes_recv + s.tp_bytes_sent, 0);
3962 h.setKeyValue("us_sent", tp_us_sent + s.tp_us_sent, 0);
3963 h.setKeyValue("us_recv", tp_us_recv + s.tp_us_recv, 0);
3964 }
3965
3966 DLLLOCAL void getUsageInfo(QoreHashNode& h) const {
3967 if (warn_queue) {
3968 h.setKeyValue("arg", warn_callback_arg.refSelf(), 0);
3969 h.setKeyValue("timeout", tl_warning_us, 0);
3970 h.setKeyValue("min_throughput", (int64)tp_warning_bs, 0);
3971 h.setKeyValue("min_throughput_us", (int64)tp_us_min, 0);
3972 }
3973
3974 h.setKeyValue("bytes_sent", tp_bytes_sent, 0);
3975 h.setKeyValue("bytes_recv", tp_bytes_recv, 0);
3976 h.setKeyValue("us_sent", tp_us_sent, 0);
3977 h.setKeyValue("us_recv", tp_us_recv, 0);
3978 }
3979
3980 DLLLOCAL QoreHashNode* getUsageInfo() const {
3981 QoreHashNode* h = new QoreHashNode(autoTypeInfo);
3982 getUsageInfo(*h);
3983 return h;
3984 }
3985
3986 DLLLOCAL void clearStats() {
3987 tp_bytes_sent = 0;
3988 tp_bytes_recv = 0;
3989 tp_us_sent = 0;
3990 tp_us_recv = 0;
3991 }
3992
3993 DLLLOCAL void doTimeoutWarning(const char* op, int64 dt) {
3994 assert(warn_queue);
3995 assert(dt > tl_warning_us);
3996
3997 QoreHashNode* h = new QoreHashNode(autoTypeInfo);
3998
3999 h->setKeyValue("type", new QoreStringNode("SOCKET-OPERATION-WARNING"), 0);
4000 h->setKeyValue("operation", new QoreStringNode(op), 0);
4001 h->setKeyValue("us", dt, 0);
4002 h->setKeyValue("timeout", tl_warning_us, 0);
4003 if (warn_callback_arg)
4004 h->setKeyValue("arg", warn_callback_arg.refSelf(), 0);
4005
4006 warn_queue->pushAndTakeRef(h);
4007 }
4008
4009 DLLLOCAL void doThroughputWarning(bool send, int64 bytes, int64 dt, double bs) {
4010 assert(warn_queue);
4011 assert(bs < tp_warning_bs);
4012
4013 QoreHashNode* h = new QoreHashNode(autoTypeInfo);
4014
4015 h->setKeyValue("type", new QoreStringNode("SOCKET-THROUGHPUT-WARNING"), 0);
4016 h->setKeyValue("dir", new QoreStringNode(send ? "send" : "recv"), 0);
4017 h->setKeyValue("bytes", bytes, 0);
4018 h->setKeyValue("us", dt, 0);
4019 h->setKeyValue("bytes_sec", bs, 0);
4020 h->setKeyValue("threshold", (int64)tp_warning_bs, 0);
4021 if (warn_callback_arg)
4022 h->setKeyValue("arg", warn_callback_arg.refSelf(), 0);
4023
4024 warn_queue->pushAndTakeRef(h);
4025 }
4026
4027 DLLLOCAL bool pendingHttpChunkedBody() const {
4028 return http_exp_chunked_body && sock != QORE_INVALID_SOCKET;
4029 }
4030
4031 DLLLOCAL void setSslVerifyMode(int mode) {
4032 //printd(5, "qore_socket_private::setSslVerifyMode() this: %p mode: %d\n", this, mode);
4033 ssl_verify_mode = mode;
4034 if (ssl)
4035 ssl->setVerifyMode(ssl_verify_mode, ssl_accept_all_certs, client_target);
4036 }
4037
4038 DLLLOCAL void acceptAllCertificates(bool accept_all = true) {
4039 ssl_accept_all_certs = accept_all;
4040 if (ssl)
4041 ssl->setVerifyMode(ssl_verify_mode, ssl_accept_all_certs, client_target);
4042 }
4043
4044 DLLLOCAL void setSslErrorString(QoreStringNode* err_str) {
4045 if (ssl_err_str) {
4046 ssl_err_str->concat("; ");
4047 ssl_err_str->concat(err_str);
4048 err_str->deref();
4049 } else {
4050 ssl_err_str = err_str;
4051 }
4052 }
4053
4054 DLLLOCAL static void getUsageInfo(const QoreSocket& sock, QoreHashNode& h, const QoreSocket& s) {
4055 sock.priv->getUsageInfo(h, *s.priv);
4056 }
4057
4058 DLLLOCAL static qore_socket_private* get(QoreSocket& sock) {
4059 return sock.priv;
4060 }
4061
4062 DLLLOCAL static const qore_socket_private* get(const QoreSocket& sock) {
4063 return sock.priv;
4064 }
4065
4066 DLLLOCAL static void captureRemoteCert(X509_STORE_CTX* x509_ctx);
4067
4068 DLLLOCAL static QoreListNode* poll(const QoreListNode* poll_list, int timeout_ms, ExceptionSink* xsink);
4069};
4070
4071#endif
DLLEXPORT const QoreEncoding * QCS_DEFAULT
the default encoding for the Qore library
DLLEXPORT const QoreEncoding * QCS_UTF8
UTF-8 multi-byte encoding (only UTF-8 and UTF-16 are multi-byte encodings)
Definition QoreEncoding.h:247
DLLEXPORT QoreEncodingManager QEM
the QoreEncodingManager object
DLLEXPORT QoreStringNode * q_strerror(int errnum)
returns the error string as a QoreStringNode
#define QORE_MIN(a, b)
macro to return the minimum of 2 numbers
Definition QoreLib.h:616
static void strtolower(char *str)
convert a string to lower-case in place
Definition QoreLib.h:268
The base class for all value and parse types in Qore expression trees.
Definition AbstractQoreNode.h:57
DLLEXPORT AbstractQoreNode * refSelf() const
returns "this" with an incremented reference count
virtual DLLEXPORT AbstractQoreNode * realCopy() const =0
returns a copy of the object; the caller owns the reference count
DLLEXPORT void deref(ExceptionSink *xsink)
decrements the reference count and calls derefImpl() if there_can_be_only_one is false,...
provides a safe and exception-safe way to release and re-acquire locks in Qore, only to be used on th...
Definition QoreThreadLock.h:190
holds arbitrary binary data
Definition BinaryNode.h:41
DLLEXPORT void append(const void *nptr, size_t size)
resizes the object and appends a copy of the data passed to the object
DLLEXPORT size_t size() const
returns the number of bytes in the object
DLLEXPORT bool empty() const
returns true if empty
DLLEXPORT const void * getPtr() const
returns the pointer to the data
constant iterator class for QoreHashNode, to be only created on the stack
Definition QoreHashNode.h:606
For use on the stack only: iterates through elements of a const QoreListNode.
Definition QoreListNode.h:575
container for holding Qore-language exception information and also for registering a "thread_exit" ca...
Definition ExceptionSink.h:50
DLLEXPORT int appendLastDescription(const char *fmt,...)
appends a formatted string to the top exception description if the desc value is a string
DLLEXPORT AbstractQoreNode * raiseException(const char *err, const char *fmt,...)
appends a Qore-language exception to the list
DLLEXPORT AbstractQoreNode * raiseErrnoException(const char *err, int en, const char *fmt,...)
appends a Qore-language exception to the list and appends the result of strerror(errno) to the descri...
DLLEXPORT AbstractQoreNode * raiseExceptionArg(const char *err, QoreValue arg, const char *fmt,...)
appends a Qore-language exception to the list, and sets the 'arg' member (this object takes over the ...
Interface for private data of input streams.
Definition InputStream.h:44
virtual int64 read(void *ptr, int64 limit, ExceptionSink *xsink)=0
Reads up to `limit` bytes from the input stream.
Interface for private data of output streams.
Definition OutputStream.h:44
virtual void write(const void *ptr, int64 count, ExceptionSink *xsink)=0
Writes bytes to the output stream.
provides an interface to getaddrinfo
Definition QoreNet.h:132
DLLLOCAL hashdecl addrinfo * getAddrInfo() const
returns the hashdecl addrinfo * being managed (may by 0)
Definition QoreNet.h:159
static DLLEXPORT QoreStringNode * getAddressDesc(int address_family, const char *addr)
returns a descriptive string for the address family and an address string (ie AF_INET6,...
DLLEXPORT int getInfo(ExceptionSink *xsink, const char *node, const char *service, int family=Q_AF_UNSPEC, int flags=0, int socktype=Q_SOCK_STREAM, int protocol=0)
get address info with the given parameters, if any errors occur, a Qore-language exception is thrown
static DLLEXPORT const char * getFamilyName(int address_family)
returns the name of the address family as a string (ie AF_INET = "ipv4", etc)
defines string encoding functions in Qore
Definition QoreEncoding.h:83
static DLLEXPORT const QoreEncoding * findCreate(const char *name)
finds an encoding if it exists (also looks up against alias names) and creates a new one if it doesn'...
This is the hash or associative list container type in Qore, dynamically allocated only,...
Definition QoreHashNode.h:51
DLLEXPORT int setKeyValue(const char *key, QoreValue value, ExceptionSink *xsink)
sets the value of "key" to "value"
DLLEXPORT QoreHashNode * hashRefSelf() const
returns "this" with an incremented reference count
DLLEXPORT size_t size() const
returns the number of members in the hash, executes in constant time
DLLEXPORT bool empty() const
returns true if the hash has no members, false if not
DLLEXPORT QoreHashNode * copy() const
performs a copy of the hash and returns the new hash
This is the list container type in Qore, dynamically allocated only, reference counted.
Definition QoreListNode.h:52
DLLEXPORT int push(QoreValue val, ExceptionSink *xsink)
adds a value to the list
Qore's arbitrary-precision number value type, dynamically-allocated only, reference counted.
Definition QoreNumberNode.h:51
the implementation of Qore's object data type, reference counted, dynamically-allocated only
Definition QoreObject.h:61
DLLEXPORT AbstractPrivateData * getReferencedPrivateData(qore_classid_t key, ExceptionSink *xsink) const
returns the private data corresponding to the class ID passed with an incremented reference count,...
DLLEXPORT void setValue(const char *key, QoreValue val, ExceptionSink *xsink)
sets the value of the given member to the given value
represents an X509 certificate, reference-counted, dynamically-allocated only
Definition QoreSSLCertificate.h:42
provides access to a private key data structure for SSL connections
Definition QoreSSLPrivateKey.h:40
DLLEXPORT double getAsFloat() const
returns the value as a float
DLLLOCAL detail::QoreValueCastHelper< T >::Result get()
returns the value as the given type
Definition QoreValue.h:214
DLLEXPORT qore_type_t getType() const
returns the type of value contained
DLLEXPORT const char * getTypeName() const
returns a string type description of the value contained (ex: "nothing" for a null AbstractQoreNode p...
DLLEXPORT bool getAsBool() const
returns the value as a bool
DLLEXPORT int64 getAsBigInt() const
returns the value as an int
DLLEXPORT void clear()
unconditionally set the QoreValue to QoreNothingNode (does not dereference any possible contained Abs...
provides access to sockets using Qore data structures
Definition QoreSocket.h:129
Qore's string type supported by the QoreEncoding class.
Definition QoreString.h:93
DLLEXPORT void set(const char *str, const QoreEncoding *new_qorecharset=QCS_DEFAULT)
copies the c-string passed and sets the value of the string and its encoding
DLLEXPORT const char * c_str() const
returns the string's buffer; this data should not be changed
DLLEXPORT size_t strlen() const
returns number of bytes in the string (not including the null pointer)
DLLEXPORT void clear()
reset string to zero length; memory is not deallocated; string encoding does not change
DLLEXPORT char * giveBuffer()
returns the character buffer and leaves the QoreString empty, the caller owns the memory returned (mu...
DLLEXPORT void concat(const QoreString *str, ExceptionSink *xsink)
concatenates a string and converts encodings if necessary
DLLEXPORT int sprintf(const char *fmt,...)
this will concatentate a formatted string to the existing string according to the format string and t...
DLLEXPORT size_t size() const
returns number of bytes in the string (not including the null pointer)
DLLEXPORT void trim(const char *chars=0)
remove leading and trailing whitespace or other characters
DLLEXPORT bool empty() const
returns true if the string is empty, false if not
Qore's string value type, reference counted, dynamically-allocated only.
Definition QoreStringNode.h:50
provides a mutually-exclusive thread lock
Definition QoreThreadLock.h:49
a templated class to manage a reference count of an object that can throw a Qore-language exception w...
Definition ReferenceHolder.h:52
base class for resolved call references
Definition CallReferenceNode.h:115
virtual DLLLOCAL QoreValue execValue(const QoreListNode *args, ExceptionSink *xsink) const =0
pure virtual function for executing the function reference
manages a reference count of a pointer to a class that takes a simple "deref()" call with no argument...
Definition ReferenceHolder.h:127
a helper class for getting socket origination information
Definition QoreSocket.h:76
Interface for private data of transformations.
Definition Transform.h:40
holds an object and dereferences it in the destructor
Definition QoreValue.h:487
unsigned qore_classid_t
used for the unique class ID for QoreClass objects
Definition common.h:85
long long int64
64bit integer type, cannot use int64_t here since it breaks the API on some 64-bit systems due to equ...
Definition common.h:266
const qore_type_t NT_BOOLEAN
type value for bools (QoreValue only)
Definition node_types.h:47
const qore_type_t NT_NUMBER
type value for QoreNumberNode
Definition node_types.h:53
const qore_type_t NT_BINARY
type value for BinaryNode
Definition node_types.h:49
const qore_type_t NT_LIST
type value for QoreListNode
Definition node_types.h:50
const qore_type_t NT_NULL
type value for QoreNullNode
Definition node_types.h:48
const qore_type_t NT_INT
type value for integers (QoreValue only)
Definition node_types.h:43
const qore_type_t NT_STRING
type value for QoreStringNode
Definition node_types.h:45
const qore_type_t NT_FLOAT
type value for floating-point values (QoreValue only)
Definition node_types.h:44
const qore_type_t NT_HASH
type value for QoreHashNode
Definition node_types.h:51
const qore_type_t NT_NOTHING
type value for QoreNothingNode
Definition node_types.h:42
DLLEXPORT int q_gettid() noexcept
returns the current TID number
The main value class in Qore, designed to be passed by value.
Definition QoreValue.h:279
DLLEXPORT void discard(ExceptionSink *xsink)
dereferences any contained AbstractQoreNode pointer and sets to 0; does not modify other values
DLLEXPORT QoreValue refSelf() const
references the contained value if type == QV_Node, returns itself