32 #ifndef _QORE_QORE_SOCKET_PRIVATE_H 33 #define _QORE_QORE_SOCKET_PRIVATE_H 35 #include "qore/intern/SSLSocketHelper.h" 37 #include "qore/intern/QC_Queue.h" 46 #include <openssl/ssl.h> 47 #include <openssl/err.h> 51 #elif defined HAVE_SYS_SELECT_H 52 #include <sys/select.h> 53 #elif (defined _WIN32 || defined __WIN32__) && ! defined __CYGWIN__ 56 #error no async socket I/O APIs available 59 #ifndef DEFAULT_SOCKET_BUFSIZE 60 #define DEFAULT_SOCKET_BUFSIZE 4096 63 #ifndef QORE_MAX_HEADER_SIZE 64 #define QORE_MAX_HEADER_SIZE 16384 67 #define CHF_HTTP11 (1 << 0) 68 #define CHF_PROCESS (1 << 1) 69 #define CHF_REQUEST (1 << 2) 71 #ifndef DEFAULT_SOCKET_MIN_THRESHOLD_BYTES 72 #define DEFAULT_SOCKET_MIN_THRESHOLD_BYTES 1024 75 DLLLOCAL
void concat_target(
QoreString& str,
const struct sockaddr *addr,
const char* type =
"target");
77 DLLLOCAL
int sock_get_raw_error();
78 DLLLOCAL
int sock_get_error();
79 DLLLOCAL
void qore_socket_error(
ExceptionSink* xsink,
const char* err,
const char* cdesc,
const char* mname = 0,
const char* host = 0,
const char* svc = 0,
const struct sockaddr *addr = 0);
80 DLLLOCAL
void qore_socket_error_intern(
int rc,
ExceptionSink* xsink,
const char* err,
const char* cdesc,
const char* mname = 0,
const char* host = 0,
const char* svc = 0,
const struct sockaddr *addr = 0);
81 DLLLOCAL
void se_in_op(
const char* cname,
const char* meth,
ExceptionSink* xsink);
82 DLLLOCAL
void se_in_op_thread(
const char* cname,
const char* meth,
ExceptionSink* xsink);
83 DLLLOCAL
void se_not_open(
const char* cname,
const char* meth,
ExceptionSink* xsink);
84 DLLLOCAL
void se_timeout(
const char* cname,
const char* meth,
int timeout_ms,
ExceptionSink* xsink);
85 DLLLOCAL
void se_closed(
const char* cname,
const char* mname,
ExceptionSink* xsink);
88 #define GETSOCKOPT_ARG_4 char* 89 #define SETSOCKOPT_ARG_4 const char* 90 #define SHUTDOWN_ARG SD_BOTH 91 #define QORE_INVALID_SOCKET ((int)INVALID_SOCKET) 92 #define QORE_SOCKET_ERROR SOCKET_ERROR 93 DLLLOCAL
int check_windows_rc(
int rc);
96 #define ECONNRESET WSAECONNRESET 101 #define GETSOCKOPT_ARG_4 void* 102 #define SETSOCKOPT_ARG_4 void* 103 #define SHUTDOWN_ARG SHUT_RDWR 104 #define QORE_INVALID_SOCKET -1 105 #define QORE_SOCKET_ERROR -1 108 hashdecl qore_socketsource_private {
112 DLLLOCAL qore_socketsource_private() : address(0), hostname(0) {
115 DLLLOCAL ~qore_socketsource_private() {
116 if (address) address->deref();
117 if (hostname) hostname->deref();
125 DLLLOCAL
void setAddress(
const char* addr) {
130 DLLLOCAL
void setHostName(
const char* host) {
137 o->
setValue(
"source", address, xsink);
142 o->
setValue(
"source_host", hostname, xsink);
148 class OptionalNonBlockingHelper {
150 qore_socket_private& sock;
154 DLLLOCAL OptionalNonBlockingHelper(qore_socket_private& s,
bool n_set,
ExceptionSink* xs);
155 DLLLOCAL ~OptionalNonBlockingHelper();
158 class PrivateQoreSocketTimeoutBase {
160 hashdecl qore_socket_private* sock;
164 DLLLOCAL PrivateQoreSocketTimeoutBase(qore_socket_private* s) : sock(s), start(sock ? q_clock_getmicros() : 0) {
168 class PrivateQoreSocketTimeoutHelper :
public PrivateQoreSocketTimeoutBase {
172 DLLLOCAL PrivateQoreSocketTimeoutHelper(qore_socket_private* s,
const char* op);
173 DLLLOCAL ~PrivateQoreSocketTimeoutHelper();
176 class PrivateQoreSocketThroughputHelper :
public PrivateQoreSocketTimeoutBase {
180 DLLLOCAL PrivateQoreSocketThroughputHelper(qore_socket_private* s,
bool snd);
181 DLLLOCAL ~PrivateQoreSocketThroughputHelper();
183 DLLLOCAL
void finalize(
int64 bytes);
186 hashdecl qore_socket_private;
188 hashdecl qore_socket_op_helper {
190 qore_socket_private* s;
193 DLLLOCAL qore_socket_op_helper(qore_socket_private* sock);
194 DLLLOCAL ~qore_socket_op_helper();
197 class SSLSocketHelperHelper {
199 qore_socket_private* s;
200 SSLSocketHelper* ssl;
201 bool context_saved =
false;
204 DLLLOCAL SSLSocketHelperHelper(qore_socket_private* sock,
bool set_thread_context =
false);
206 DLLLOCAL ~SSLSocketHelperHelper();
208 DLLLOCAL
void error();
211 hashdecl qore_socket_private {
212 friend class PrivateQoreSocketTimeoutHelper;
213 friend class PrivateQoreSocketThroughputHelper;
216 static thread_local qore_socket_private* current_socket;
218 int sock, sfamily, port, stype, sprot;
221 int64 connection_id = 0;
225 std::string socketname;
227 std::string client_target;
228 SSLSocketHelper* ssl =
nullptr;
229 Queue* cb_queue =
nullptr,
230 * warn_queue =
nullptr;
233 char rbuf[DEFAULT_SOCKET_BUFSIZE];
239 int64 tl_warning_us = 0;
240 double tp_warning_bs = 0;
241 int64 tp_bytes_sent = 0,
250 http_exp_chunked_body =
false,
251 ssl_accept_all_certs =
false,
252 ssl_capture_remote_cert =
false;
254 ssl_verify_mode = SSL_VERIFY_NONE;
259 DLLLOCAL qore_socket_private(
int n_sock = QORE_INVALID_SOCKET,
int n_sfamily = AF_UNSPEC,
int n_stype = SOCK_STREAM,
int n_prot = 0,
const QoreEncoding* n_enc =
QCS_DEFAULT) :
260 sock(n_sock), sfamily(n_sfamily), port(-1), stype(n_stype), sprot(n_prot), enc(n_enc) {
263 DLLLOCAL ~qore_socket_private() {
271 DLLLOCAL
bool isOpen() {
272 return sock != QORE_INVALID_SOCKET;
275 DLLLOCAL
int close() {
276 int rc = close_internal();
279 if (http_exp_chunked_body)
280 http_exp_chunked_body =
false;
288 DLLLOCAL
int close_and_reset() {
289 assert(sock != QORE_INVALID_SOCKET);
293 rc = ::closesocket(sock);
298 if (!rc || sock_get_error() != EINTR)
302 sock = QORE_INVALID_SOCKET;
312 client_target.clear();
316 DLLLOCAL
int close_internal() {
319 remote_cert->
deref(
nullptr);
320 remote_cert =
nullptr;
330 if (!socketname.empty()) {
332 unlink(socketname.c_str());
340 return close_and_reset();
346 DLLLOCAL
int getSendTimeout()
const {
349 #if defined(HPUX) && defined(__ia64) && defined(__LP64__) 352 int size =
sizeof(
hashdecl timeval);
354 socklen_t size =
sizeof(
hashdecl timeval);
357 if (getsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (GETSOCKOPT_ARG_4)&tv, (socklen_t *)&size))
360 return tv.tv_sec * 1000 + tv.tv_usec / 1000;
363 DLLLOCAL
int getRecvTimeout()
const {
366 #if defined(HPUX) && defined(__ia64) && defined(__LP64__) 369 int size =
sizeof(
hashdecl timeval);
371 socklen_t size =
sizeof(
hashdecl timeval);
374 if (getsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (GETSOCKOPT_ARG_4)&tv, (socklen_t *)&size))
377 return tv.tv_sec * 1000 + tv.tv_usec / 1000;
380 DLLLOCAL
int getPort() {
382 if (sock == QORE_INVALID_SOCKET || (sfamily != AF_INET && sfamily != AF_INET6) || port > 0)
386 hashdecl sockaddr_storage addr;
387 #if defined(HPUX) && defined(__ia64) && defined(__LP64__) 389 int size =
sizeof addr;
391 socklen_t size =
sizeof addr;
394 if (getsockname(sock, (
struct sockaddr *)&addr, (socklen_t *)&size) < 0)
397 port = q_get_port_from_addr((
const struct sockaddr *)&addr);
402 switch (v.getType()) {
407 hdr.
sprintf(
"%s: " QLLD
"\r\n", key, v.getAsBigInt());
411 size_t offset = hdr.
size();
412 hdr.
sprintf(
"%f\r\n", v.getAsFloat());
416 q_fix_decimal(&hdr, offset);
425 hdr.
sprintf(
"%s: %d\r\n", key, (
int)v.getAsBool());
438 const char* key = hi.getKey();
439 if (addsize && !strcasecmp(key,
"transfer-encoding"))
441 if (addsize && !strcasecmp(key,
"content-length"))
446 do_header(key, hdr, li.getValue());
449 do_header(key, hdr, v);
454 hdr.
sprintf(
"Content-Length: " QSD
"\r\n", size);
459 DLLLOCAL
int listen(
int backlog = 20) {
460 if (sock == QORE_INVALID_SOCKET)
465 if (::listen(sock, backlog)) {
472 return ::listen(sock, backlog);
476 DLLLOCAL
int accept_intern(
ExceptionSink* xsink,
struct sockaddr *addr, socklen_t *size,
int timeout_ms = -1) {
480 if (timeout_ms >= 0 && !isDataAvailable(timeout_ms,
"accept", xsink)) {
487 int rc = ::accept(sock, addr, size);
488 if (rc != QORE_INVALID_SOCKET)
492 if (sock_get_error() == EINTR)
495 qore_socket_error(xsink,
"SOCKET-ACCEPT-ERROR",
"error in accept()", 0, 0, 0, addr);
503 if (sock == QORE_INVALID_SOCKET) {
506 xsink->
raiseException(
"SOCKET-NOT-OPEN",
"socket must be opened, bound, and in a listening state before new connections can be accepted");
511 se_in_op(
"Socket",
"accept", xsink);
514 se_in_op_thread(
"Socket",
"accept", xsink);
515 return QSE_IN_OP_THREAD;
519 if (sfamily == AF_UNIX) {
523 xsink->
raiseException(
"SOCKET-ACCEPT-ERROR",
"UNIX sockets are not available under Windows");
526 hashdecl sockaddr_un addr_un;
528 #if defined(HPUX) && defined(__ia64) && defined(__LP64__) 531 int size =
sizeof(
hashdecl sockaddr_un);
533 socklen_t size =
sizeof(
hashdecl sockaddr_un);
535 rc = accept_intern(xsink, (
struct sockaddr *)&addr_un, (socklen_t *)&size, timeout_ms);
538 if (rc >= 0 && source) {
540 addr->
sprintf(
"UNIX socket: %s", socketname.c_str());
541 source->priv->setAddress(addr);
542 source->priv->setHostName(
"localhost");
545 }
else if (sfamily == AF_INET || sfamily == AF_INET6) {
546 hashdecl sockaddr_storage addr_in;
547 #if defined(HPUX) && defined(__ia64) && defined(__LP64__) 550 int size =
sizeof(addr_in);
552 socklen_t size =
sizeof(addr_in);
555 rc = accept_intern(xsink, (
struct sockaddr *)&addr_in, (socklen_t *)&size, timeout_ms);
558 if (rc >= 0 && source) {
559 char host[NI_MAXHOST + 1];
560 char service[NI_MAXSERV + 1];
562 if (!getnameinfo((
struct sockaddr *)&addr_in, qore_get_in_len((
struct sockaddr *)&addr_in), host,
sizeof(host), service,
sizeof(service), NI_NUMERICSERV)) {
563 source->priv->setHostName(host);
567 char ifname[INET6_ADDRSTRLEN];
568 if (inet_ntop(addr_in.ss_family, qore_get_in_addr((
struct sockaddr *)&addr_in), ifname,
sizeof(ifname))) {
570 source->priv->setAddress(ifname);
577 xsink->
raiseException(
"SOCKET-ACCEPT-ERROR",
"do not know how to accept connections with address family %d", sfamily);
593 cb_queue->pushAndTakeRef(h);
596 cb_queue->deref(xsink);
600 warn_queue->deref(xsink);
609 DLLLOCAL
void setEventQueue(Queue* cbq,
ExceptionSink* xsink) {
611 cb_queue->deref(xsink);
615 DLLLOCAL
void do_start_ssl_event() {
621 cb_queue->pushAndTakeRef(h);
625 DLLLOCAL
void do_ssl_established_event() {
628 h->
setKeyValue(
"event", QORE_EVENT_SSL_ESTABLISHED, 0);
633 cb_queue->pushAndTakeRef(h);
637 DLLLOCAL
void do_connect_event(
int af,
const struct sockaddr* addr,
const char* target,
const char* service = 0,
int prt = -1) {
648 q_af_to_hash(af, *h, 0);
654 cb_queue->pushAndTakeRef(h);
658 DLLLOCAL
void do_connected_event() {
664 cb_queue->pushAndTakeRef(h);
674 if (event == QORE_EVENT_HTTP_CHUNKED_DATA_RECEIVED)
679 cb_queue->pushAndTakeRef(h);
683 DLLLOCAL
void do_read_http_header(
int event,
const QoreHashNode* headers,
int source) {
690 cb_queue->pushAndTakeRef(h);
697 h->
setKeyValue(
"event", QORE_EVENT_HTTP_SEND_MESSAGE, 0);
703 cb_queue->pushAndTakeRef(h);
707 DLLLOCAL
void do_close_event() {
710 h->
setKeyValue(
"event", QORE_EVENT_CHANNEL_CLOSED, 0);
713 cb_queue->pushAndTakeRef(h);
721 h->
setKeyValue(
"event", QORE_EVENT_PACKET_READ, 0);
729 cb_queue->pushAndTakeRef(h);
733 DLLLOCAL
void do_send_event(
int bytes_sent,
int total_sent,
int bufsize) {
737 h->
setKeyValue(
"event", QORE_EVENT_PACKET_SENT, 0);
743 cb_queue->pushAndTakeRef(h);
747 DLLLOCAL
void do_resolve_event(
const char* host,
const char* service = 0) {
751 h->
setKeyValue(
"event", QORE_EVENT_HOSTNAME_LOOKUP, 0);
758 cb_queue->pushAndTakeRef(h);
762 DLLLOCAL
void do_resolved_event(
const struct sockaddr* addr) {
766 h->
setKeyValue(
"event", QORE_EVENT_HOSTNAME_RESOLVED, 0);
774 int prt = q_get_port_from_addr(addr);
777 q_af_to_hash(addr->sa_family, *h, 0);
778 cb_queue->pushAndTakeRef(h);
782 DLLLOCAL
int64 getObjectIDForEvents()
const {
786 DLLLOCAL
int connectUNIX(
const char* p,
int sock_type,
int protocol,
ExceptionSink* xsink) {
789 QORE_TRACE(
"connectUNIX()");
794 xsink->
raiseException(
"SOCKET-CONNECTUNIX-ERROR",
"UNIX sockets are not available under Windows");
800 printd(5,
"qore_socket_private::connectUNIX(%s)\n", p);
802 hashdecl sockaddr_un addr;
804 addr.sun_family = AF_UNIX;
806 strncpy(addr.sun_path, p,
sizeof(addr.sun_path) - 1);
807 addr.sun_path[
sizeof(addr.sun_path) - 1] =
'\0';
808 if ((sock = socket(AF_UNIX, sock_type, protocol)) == QORE_SOCKET_ERROR) {
811 xsink->
raiseErrnoException(
"SOCKET-CONNECT-ERROR", errno,
"error connecting to UNIX socket: '%s'", p);
815 do_connect_event(AF_UNIX, (sockaddr*)&addr, p);
817 if (!::connect(sock, (
const sockaddr *)&addr,
sizeof(
struct sockaddr_un)))
821 if (sock_get_error() == EINTR)
827 qore_socket_error(xsink,
"SOCKET-CONNECT-ERROR",
"error in connect()", 0, p);
833 socketname = addr.sun_path;
836 do_connected_event();
848 DLLLOCAL
int asyncIoWait(
int timeout_ms,
bool read,
bool write,
const char* cname,
const char* mname,
ExceptionSink* xsink)
const {
850 assert(read || write);
851 if (sock == QORE_INVALID_SOCKET) {
852 se_not_open(cname, mname, xsink);
856 return asyncIoWait(timeout_ms, read, write, xsink);
859 DLLLOCAL
int asyncIoWait(
int timeout_ms,
bool read,
bool write,
ExceptionSink* xsink)
const {
861 #if defined HAVE_POLL 862 return poll_intern(xsink, timeout_ms, read, write);
863 #elif defined HAVE_SELECT 864 return select_intern(xsink, timeout_ms, read, write);
866 #error no async socket operations supported 870 #if defined HAVE_POLL 871 DLLLOCAL
int poll_intern(
ExceptionSink* xsink,
int timeout_ms,
bool read,
bool write)
const {
878 pollfd fds = {sock, arg, 0};
880 rc = poll(&fds, 1, timeout_ms);
881 if (rc == -1 && errno == EINTR)
886 qore_socket_error(xsink,
"SOCKET-SELECT-ERROR",
"poll(2) returned an error");
887 else if (!rc && ((fds.revents & POLLHUP) || (fds.revents & (POLLERR|POLLNVAL))))
892 #elif defined HAVE_SELECT 893 DLLLOCAL
int select_intern(
ExceptionSink* xsink,
int timeout_ms,
bool read,
bool write)
const {
894 bool aborted =
false;
895 int rc = select_intern(xsink, timeout_ms, read, write, aborted);
896 if (rc != QORE_SOCKET_ERROR && aborted)
901 DLLLOCAL
int select_intern(
ExceptionSink* xsink,
int timeout_ms,
bool read,
bool write,
bool& aborted)
const {
908 if (sock >= FD_SETSIZE) {
911 xsink->
raiseException(
"SOCKET-SELECT-ERROR",
"fd is %d which is >= %d; contact the Qore developers to implement an alternative to select() on this platform", sock, FD_SETSIZE);
926 tv.tv_sec = timeout_ms / 1000;
927 tv.tv_usec = (timeout_ms % 1000) * 1000;
929 fd_set* readfd = read ? &sfs : 0;
930 fd_set* writefd = write ? &sfs : 0;
932 rc = select(sock + 1, readfd, writefd, &err, &tv);
934 if (rc != QORE_SOCKET_ERROR) {
935 if (FD_ISSET(sock, &err))
939 if (sock_get_error() != EINTR)
942 if (rc == QORE_SOCKET_ERROR) {
945 qore_socket_error(xsink,
"SOCKET-SELECT-ERROR",
"select(2) returned an error");
952 DLLLOCAL
bool tryReadSocketData(
const char* mname,
ExceptionSink* xsink) {
957 return asyncIoWait(0,
true,
false,
"Socket", mname, xsink);
961 int rc = ssl->doSSLRW(xsink, mname, rbuf, 1, 0, PEEK,
false);
962 if (*xsink || (rc == QSE_TIMEOUT)) {
965 return rc > 0 ? true :
false;
968 DLLLOCAL
bool isSocketDataAvailable(
int timeout_ms,
const char* mname,
ExceptionSink* xsink) {
969 return asyncIoWait(timeout_ms,
true,
false,
"Socket", mname, xsink);
972 DLLLOCAL
bool isDataAvailable(
int timeout_ms,
const char* mname,
ExceptionSink* xsink) {
975 return isSocketDataAvailable(timeout_ms, mname, xsink);
978 DLLLOCAL
bool isWriteFinished(
int timeout_ms,
const char* mname,
ExceptionSink* xsink) {
979 return asyncIoWait(timeout_ms,
false,
true,
"Socket", mname, xsink);
982 DLLLOCAL
int close_and_exit() {
983 if (sock != QORE_INVALID_SOCKET)
988 DLLLOCAL
int connectINETTimeout(
int timeout_ms,
const struct sockaddr* ai_addr,
qore_size_t ai_addrlen,
ExceptionSink* xsink,
bool only_timeout) {
990 PrivateQoreSocketTimeoutHelper toh(
this,
"connect");
993 if (!::connect(sock, ai_addr, ai_addrlen))
997 if (sock_get_error() != EAGAIN) {
998 qore_socket_error(xsink,
"SOCKET-CONNECT-ERROR",
"error in connect()", 0, 0, 0, ai_addr);
1006 if (errno != EINPROGRESS)
1015 bool aborted =
false;
1016 int rc = select_intern(xsink, timeout_ms,
false,
true, aborted);
1022 if (rc != QORE_SOCKET_ERROR && aborted) {
1023 qore_socket_error(xsink,
"SOCKET-CONNECT-ERROR",
"error in connect()", 0, 0, 0, ai_addr);
1027 int rc = asyncIoWait(timeout_ms,
false,
true,
"Socket",
"connectINETTimeout", xsink);
1033 if (rc == QORE_SOCKET_ERROR && sock_get_error() != EINTR) {
1035 qore_socket_error(xsink,
"SOCKET-CONNECT-ERROR",
"error in asyncIoWait() with Socket::connect() with timeout", 0, 0, 0, ai_addr);
1037 }
else if (rc > 0) {
1039 socklen_t lon =
sizeof(int);
1042 if (getsockopt(sock, SOL_SOCKET, SO_ERROR, (GETSOCKOPT_ARG_4)(&val), &lon) == QORE_SOCKET_ERROR) {
1044 qore_socket_error(xsink,
"SOCKET-CONNECT-ERROR",
"error in getsockopt()", 0, 0, 0, ai_addr);
1053 qore_socket_error_intern(val, xsink,
"SOCKET-CONNECT-ERROR",
"error in getsockopt()", 0, 0, 0, ai_addr);
1061 concat_target(*(*desc), ai_addr);
1073 DLLLOCAL
int sock_errno_err(
const char* err,
const char* desc,
ExceptionSink* xsink) {
1074 sock = QORE_INVALID_SOCKET;
1075 qore_socket_error(xsink, err, desc);
1079 DLLLOCAL
int set_non_blocking(
bool non_blocking,
ExceptionSink* xsink) {
1083 if (sock == QORE_INVALID_SOCKET) {
1084 assert(!xsink || *xsink);
1089 u_long mode = non_blocking ? 1 : 0;
1090 int rc = ioctlsocket(sock, FIONBIO, &mode);
1091 if (check_windows_rc(rc))
1092 return sock_errno_err(
"SOCKET-CONNECT-ERROR",
"error in ioctlsocket(FIONBIO)", xsink);
1097 if ((arg = fcntl(sock, F_GETFL, 0)) < 0)
1098 return sock_errno_err(
"SOCKET-CONNECT-ERROR",
"error in fcntl() getting socket descriptor status flag", xsink);
1105 if (fcntl(sock, F_SETFL, arg) < 0)
1106 return sock_errno_err(
"SOCKET-CONNECT-ERROR",
"error in fcntl() setting socket descriptor status flag", xsink);
1112 DLLLOCAL
int connectINET(
const char* host,
const char* service,
int timeout_ms,
ExceptionSink* xsink,
int family = AF_UNSPEC,
int type = SOCK_STREAM,
int protocol = 0) {
1114 family = q_get_af(family);
1115 type = q_get_sock_type(type);
1117 QORE_TRACE(
"qore_socket_private::connectINET()");
1122 printd(5,
"qore_socket_private::connectINET(%s:%s, %dms)\n", host, service, timeout_ms);
1124 do_resolve_event(host, service);
1127 if (ai.
getInfo(xsink, host, service, family, 0, type, protocol))
1134 for (
struct addrinfo *p = aip; p; p = p->ai_next)
1135 do_resolved_event(p->ai_addr);
1137 int prt = q_get_port_from_addr(aip->ai_addr);
1139 for (
struct addrinfo *p = aip; p; p = p->ai_next) {
1140 if (!connectINETIntern(host, service, p->ai_family, p->ai_addr, p->ai_addrlen, p->ai_socktype, p->ai_protocol, prt, timeout_ms, xsink,
true))
1147 qore_socket_error(xsink,
"SOCKET-CONNECT-ERROR",
"error in connect()", 0, host, service);
1151 DLLLOCAL
int connectINETIntern(
const char* host,
const char* service,
int ai_family,
struct sockaddr* ai_addr,
size_t ai_addrlen,
int ai_socktype,
int ai_protocol,
int prt,
int timeout_ms,
ExceptionSink* xsink,
bool only_timeout =
false) {
1153 printd(5,
"qore_socket_private::connectINETIntern() host: %s service: %s family: %d timeout_ms: %d\n", host, service, ai_family, timeout_ms);
1154 if ((sock = socket(ai_family, ai_socktype, ai_protocol)) == QORE_INVALID_SOCKET) {
1157 xsink->
raiseErrnoException(
"SOCKET-CONNECT-ERROR", errno,
"cannot establish a connection to %s:%s", host, service);
1166 if (timeout_ms >= 0) {
1168 if (set_non_blocking(
true, xsink))
1169 return close_and_exit();
1171 do_connect_event(ai_family, ai_addr, host, service, prt);
1173 rc = connectINETTimeout(timeout_ms, ai_addr, ai_addrlen, xsink, only_timeout);
1177 if (set_non_blocking(
false, xsink))
1178 return close_and_exit();
1180 do_connect_event(ai_family, ai_addr, host, service, prt);
1183 rc = ::connect(sock, ai_addr, ai_addrlen);
1186 if (!rc || sock_get_error() != EINTR)
1192 if (!only_timeout || errno == ETIMEDOUT)
1193 qore_socket_error(xsink,
"SOCKET-CONNECT-ERROR",
"error in connect()", 0, host, service);
1195 return close_and_exit();
1198 sfamily = ai_family;
1199 stype = ai_socktype;
1200 sprot = ai_protocol;
1204 do_connected_event();
1207 client_target = host;
1211 DLLLOCAL
int upgradeClientToSSLIntern(
const char* mname,
const char* sni_target_host, X509* cert, EVP_PKEY* pkey,
int timeout_ms,
ExceptionSink* xsink) {
1213 SSLSocketHelperHelper sshh(
this,
true);
1216 do_start_ssl_event();
1218 if (!sni_target_host && !client_target.empty()) {
1219 sni_target_host = client_target.c_str();
1221 if ((rc = ssl->setClient(mname, sni_target_host, sock, cert, pkey, xsink)) || ssl->connect(mname, timeout_ms, xsink)) {
1223 return rc ? rc : -1;
1225 do_ssl_established_event();
1230 DLLLOCAL
int upgradeServerToSSLIntern(
const char* mname, X509* cert, EVP_PKEY* pkey,
int timeout_ms,
ExceptionSink* xsink) {
1233 SSLSocketHelperHelper sshh(
this,
true);
1235 do_start_ssl_event();
1236 if (ssl->setServer(mname, sock, cert, pkey, xsink) || ssl->accept(mname, timeout_ms, xsink)) {
1240 do_ssl_established_event();
1246 DLLLOCAL
int openUNIX(
int sock_type = SOCK_STREAM,
int protocol = 0) {
1247 if (sock != QORE_INVALID_SOCKET)
1250 if ((sock = socket(AF_UNIX, sock_type, protocol)) == QORE_INVALID_SOCKET) {
1262 DLLLOCAL
int openINET(
int family = AF_INET,
int sock_type = SOCK_STREAM,
int protocol = 0) {
1263 if (sock != QORE_INVALID_SOCKET)
1266 if ((sock = socket(family, sock_type, protocol)) == QORE_INVALID_SOCKET)
1276 DLLLOCAL
int reuse(
int opt) {
1278 return setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (SETSOCKOPT_ARG_4)&opt,
sizeof(
int));
1282 DLLLOCAL
int bindIntern(
struct sockaddr* ai_addr,
size_t ai_addrlen,
int prt,
bool reuseaddr,
ExceptionSink* xsink = 0) {
1285 if ((::bind(sock, ai_addr, ai_addrlen)) == QORE_SOCKET_ERROR) {
1287 qore_socket_error(xsink,
"SOCKET-BIND-ERROR",
"error in bind()", 0, 0, 0, ai_addr);
1297 #if defined(HPUX) && defined(__ia64) && defined(__LP64__) 1299 int len = ai_addrlen;
1301 socklen_t len = ai_addrlen;
1304 if (getsockname(sock, ai_addr, &len))
1307 port = q_get_port_from_addr(ai_addr);
1313 DLLLOCAL
int bindUNIX(
ExceptionSink* xsink,
const char* name,
int socktype = SOCK_STREAM,
int protocol = 0) {
1318 xsink->
raiseException(
"SOCKET-BINDUNIX-ERROR",
"UNIX sockets are not available under Windows");
1324 if (openUNIX(socktype, protocol)) {
1327 xsink->
raiseErrnoException(
"SOCKET-BIND-ERROR", errno,
"error opening UNIX socket ('%s') for bind", name);
1331 hashdecl sockaddr_un addr;
1332 addr.sun_family = AF_UNIX;
1334 strncpy(addr.sun_path, name,
sizeof(addr.sun_path) - 1);
1335 addr.sun_path[
sizeof(addr.sun_path) - 1] =
'\0';
1337 if (bindIntern((sockaddr*)&addr,
sizeof(
struct sockaddr_un), -1,
false, xsink))
1341 socketname = addr.sun_path;
1348 DLLLOCAL
int bindINET(
ExceptionSink* xsink,
const char* name,
const char* service,
bool reuseaddr =
true,
int family = AF_UNSPEC,
int socktype = SOCK_STREAM,
int protocol = 0) {
1350 family = q_get_af(family);
1351 socktype = q_get_sock_type(socktype);
1356 do_resolve_event(name, service);
1357 if (ai.
getInfo(xsink, name, service, family, AI_PASSIVE, socktype, protocol))
1363 for (
struct addrinfo *p = aip; p; p = p->ai_next)
1364 do_resolved_event(p->ai_addr);
1367 if (openINET(aip->ai_family, aip->ai_socktype, protocol)) {
1368 qore_socket_error(xsink,
"SOCKET-BINDINET-ERROR",
"error opening socket for bind", 0, name, service);
1372 int prt = q_get_port_from_addr(aip->ai_addr);
1376 for (
struct addrinfo *p = aip; p; p = p->ai_next) {
1377 if (!bindIntern(p->ai_addr, p->ai_addrlen, prt, reuseaddr)) {
1382 en = sock_get_raw_error();
1387 qore_socket_error_intern(en, xsink,
"SOCKET-BIND-ERROR",
"error binding on socket", 0, name, service);
1394 if (sock == QORE_INVALID_SOCKET) {
1395 se_not_open(
"Socket",
"getPeerInfo", xsink);
1399 hashdecl sockaddr_storage addr;
1400 socklen_t len =
sizeof addr;
1401 if (getpeername(sock, (
struct sockaddr*)&addr, &len)) {
1402 qore_socket_error(xsink,
"SOCKET-GETPEERINFO-ERROR",
"error in getpeername()");
1406 return getAddrInfo(addr, len, host_lookup);
1412 if (sock == QORE_INVALID_SOCKET) {
1413 se_not_open(
"Socket",
"getSocketInfo", xsink);
1417 hashdecl sockaddr_storage addr;
1418 #if defined(HPUX) && defined(__ia64) && defined(__LP64__) 1420 int len =
sizeof addr;
1422 socklen_t len =
sizeof addr;
1425 if (getsockname(sock, (
struct sockaddr*)&addr, &len)) {
1426 qore_socket_error(xsink,
"SOCKET-GETSOCKETINFO-ERROR",
"error in getsockname()");
1430 return getAddrInfo(addr, len, host_lookup);
1433 DLLLOCAL
QoreHashNode* getAddrInfo(
const struct sockaddr_storage& addr, socklen_t len,
bool host_lookup =
true)
const {
1436 if (addr.ss_family == AF_INET || addr.ss_family == AF_INET6) {
1438 char host[NI_MAXHOST + 1];
1440 if (!getnameinfo((
struct sockaddr*)&addr, qore_get_in_len((
struct sockaddr*)&addr), host,
sizeof(host), 0, 0, 0)) {
1448 char ifname[INET6_ADDRSTRLEN];
1449 if (inet_ntop(addr.ss_family, qore_get_in_addr((
struct sockaddr*)&addr), ifname,
sizeof(ifname))) {
1456 if (addr.ss_family == AF_INET) {
1457 hashdecl sockaddr_in* s = (
hashdecl sockaddr_in*)&addr;
1458 tport = ntohs(s->sin_port);
1460 hashdecl sockaddr_in6* s = (
hashdecl sockaddr_in6*)&addr;
1461 tport = ntohs(s->sin6_port);
1467 else if (addr.ss_family == AF_UNIX) {
1468 assert(!socketname.empty());
1484 hashdecl sockaddr_storage addr;
1486 socklen_t len =
sizeof addr;
1487 if (getpeername(sock, (
struct sockaddr*)&addr, &len))
1490 if (addr.ss_family == AF_INET || addr.ss_family == AF_INET6) {
1492 char ifname[INET6_ADDRSTRLEN];
1493 if (inet_ntop(addr.ss_family, qore_get_in_addr((
struct sockaddr *)&addr), ifname,
sizeof(ifname))) {
1498 char host[NI_MAXHOST + 1];
1499 if (!getnameinfo((
struct sockaddr *)&addr, qore_get_in_len((
struct sockaddr *)&addr), host,
sizeof(host), 0, 0, 0))
1503 else if (addr.ss_family == AF_UNIX) {
1505 hashdecl sockaddr_un *addr_un = (
hashdecl sockaddr_un *)&addr;
1506 astr->
sprintf(
"UNIX socket: %s", addr_un->sun_path);
1517 assert(sock != QORE_INVALID_SOCKET);
1522 buf = rbuf + bufoffset;
1540 if (timeout != -1 && !isDataAvailable(timeout, meth, xsink)) {
1543 se_timeout(
"Socket", meth, timeout, xsink);
1552 rc = ::recv(sock, rbuf, DEFAULT_SOCKET_BUFSIZE, flags);
1553 if (rc == QORE_SOCKET_ERROR) {
1558 if (errno == ECONNRESET) {
1559 se_closed(
"Socket", meth, xsink);
1563 qore_socket_error(xsink,
"SOCKET-RECV-ERROR",
"error in recv()", meth);
1572 rc = ssl->read(meth, rbuf, DEFAULT_SOCKET_BUFSIZE, timeout, xsink);
1587 do_read_event(rc, rc);
1603 if (sock == QORE_INVALID_SOCKET) {
1604 se_not_open(
"Socket", meth, xsink);
1609 PrivateQoreSocketThroughputHelper th(
this,
false);
1624 rc = brecv(xsink, meth, buf, 1, 0, timeout,
false);
1632 se_closed(
"Socket", meth, xsink);
1634 xsink->
raiseExceptionArg(
"SOCKET-HTTP-ERROR", hdr.release(),
"socket closed on remote end while reading header data after reading " QSD
" byte%s", count, count == 1 ?
"" :
"s");
1640 if (++count == QORE_MAX_HEADER_SIZE) {
1643 xsink->
raiseException(
"SOCKET-HTTP-ERROR",
"header size cannot exceed " QSD
" bytes", count);
1654 if (exit_early && hdr->
empty())
1661 }
else if (c ==
'\r') {
1676 case 0: hdr->
concat(
'\r');
break;
1677 case 1: hdr->
concat(
"\r\n");
break;
1678 case 2: hdr->
concat(
"\r\n\r");
break;
1679 case 3: hdr->
concat(
'\n');
break;
1689 th.finalize(hdr->
size());
1691 return hdr.release();
1696 if (sock == QORE_INVALID_SOCKET) {
1697 se_not_open(
"Socket",
"recv", xsink);
1703 se_in_op(
"Socket",
"recv", xsink);
1706 se_in_op_thread(
"Socket",
"recv", xsink);
1710 PrivateQoreSocketThroughputHelper th(
this,
false);
1712 qore_size_t bs = bufsize > 0 && bufsize < DEFAULT_SOCKET_BUFSIZE ? bufsize : DEFAULT_SOCKET_BUFSIZE;
1719 rc = brecv(xsink,
"recv", buf, bs, 0, timeout,
false);
1722 printd(5,
"qore_socket_private::recv(" QSD
", %d) bs=" QSD
", br=" QSD
", rc=" QSD
", errno: %d (%s)\n", bufsize, timeout, bs, str->
size(), rc, errno, strerror(errno));
1729 do_read_event(rc, str->
size(), bufsize);
1734 if ((bufsize - str->
size()) < bs)
1735 bs = bufsize - str->
size();
1739 printd(5,
"qore_socket_private::recv() received " QSD
" byte(s), bufsize=" QSD
", strlen=" QSD
" str='%s'\n", str->
size(), bufsize, (str ? str->
strlen() : 0), str ? str->
getBuffer() :
"n/a");
1745 th.finalize(str->
size());
1747 return *xsink ? 0 : str.release();
1752 if (sock == QORE_INVALID_SOCKET) {
1753 se_not_open(
"Socket",
"recv", xsink);
1759 se_in_op(
"Socket",
"recv", xsink);
1762 se_in_op_thread(
"Socket",
"recv", xsink);
1766 PrivateQoreSocketThroughputHelper th(
this,
false);
1772 rc = brecv(xsink,
"recv", buf, DEFAULT_SOCKET_BUFSIZE, 0, timeout,
false);
1779 do_read_event(rc, rc);
1782 if (isDataAvailable(0,
"recv", xsink)) {
1784 rc = brecv(xsink,
"recv", buf, DEFAULT_SOCKET_BUFSIZE, 0, 0,
false);
1790 th.finalize(str->
size());
1796 do_read_event(rc, str->
size());
1797 }
while (isDataAvailable(0,
"recv", xsink));
1800 th.finalize(str->
size());
1806 return str.release();
1813 if (sock == QORE_INVALID_SOCKET) {
1814 se_not_open(
"Socket",
"recvBinary", xsink);
1820 se_in_op(
"Socket",
"recvBinary", xsink);
1823 se_in_op_thread(
"Socket",
"recvBinary", xsink);
1827 PrivateQoreSocketThroughputHelper th(
this,
false);
1829 qore_size_t bs = bufsize > 0 && bufsize < DEFAULT_SOCKET_BUFSIZE ? bufsize : DEFAULT_SOCKET_BUFSIZE;
1835 rc = brecv(xsink,
"recvBinary", buf, bs, 0, timeout);
1844 if ((bufsize - b->size()) < bs)
1845 bs = bufsize - b->size();
1849 th.finalize(b->size());
1858 printd(5,
"qore_socket_private::recvBinary() received " QSD
" byte(s), bufsize=" QSD
", blen=" QSD
"\n", b->size(), bufsize, b->size());
1864 if (sock == QORE_INVALID_SOCKET) {
1865 se_not_open(
"Socket",
"recvBinary", xsink);
1871 se_in_op(
"Socket",
"recvBinary", xsink);
1874 se_in_op_thread(
"Socket",
"recvBinary", xsink);
1878 PrivateQoreSocketThroughputHelper th(
this,
false);
1885 rc = brecv(xsink,
"recvBinary", buf, DEFAULT_SOCKET_BUFSIZE, 0, timeout,
false);
1892 do_read_event(rc, rc);
1895 if (isDataAvailable(0,
"recvBinary", xsink)) {
1897 rc = brecv(xsink,
"recvBinary", buf, DEFAULT_SOCKET_BUFSIZE, 0, 0,
false);
1902 th.finalize(b->size());
1909 do_read_event(rc, b->size());
1910 }
while (isDataAvailable(0,
"recvBinary", xsink));
1913 th.finalize(b->size());
1924 if (sock == QORE_INVALID_SOCKET) {
1925 se_not_open(
"Socket",
"recvToOutputStream", xsink);
1930 se_in_op(
"Socket",
"recvToOutputStream", xsink);
1933 se_in_op_thread(
"Socket",
"recvToOutputStream", xsink);
1937 qore_socket_op_helper oh(
this);
1941 while (size < 0 || br < size) {
1943 int bn = size < 0 ? DEFAULT_SOCKET_BUFSIZE :
QORE_MIN(size - br, DEFAULT_SOCKET_BUFSIZE);
1945 qore_offset_t rc = brecv(xsink,
"recvToOutputStream", buf, bn, 0, timeout);
1954 xsink->
raiseException(
"SOCKET-RECV-ERROR",
"Unexpected end of stream");
1962 os->
write(buf, rc, xsink);
1981 return hdr.release();
1985 qore_offset_t& rc,
int source,
const char* headers_raw_key =
"headers-raw") {
1997 if ((p = (
char*)strstr(buf,
"\r\n"))) {
2000 }
else if ((p = (
char*)strchr(buf,
'\n'))) {
2003 }
else if ((p = (
char*)strchr(buf,
'\r'))) {
2009 xsink->
raiseException(
"SOCKET-HTTP-ERROR",
"invalid header received with embedded nulls in Socket::readHTTPHeader()");
2014 if (!(t1 = (
char*)strstr(buf,
"HTTP/"))) {
2015 xsink->
raiseExceptionArg(
"SOCKET-HTTP-ERROR", hdr.release(),
"missing HTTP version string in first header line in Socket::readHTTPHeader()");
2026 int flags = CHF_PROCESS;
2033 flags |= CHF_HTTP11;
2038 char* t2 = (
char*)strchr(buf + 8,
' ');
2041 if (isdigit(*(t2))) {
2043 if (strlen(t2) > 4) {
2051 char* t2 = (
char*)strchr(buf,
' ');
2056 t1 = strchr(t2,
' ');
2066 flags |= CHF_REQUEST;
2069 bool close = convertHeaderToHash(*h, p, flags, info, &http_exp_chunked_body, headers_raw_key);
2070 do_read_http_header(QORE_EVENT_HTTP_MESSAGE_RECEIVED, *h, source);
2073 if ((flags & CHF_REQUEST) && info)
2089 arg->
setKeyValue(
"send_aborted", send_aborted, xsink);
2090 args->push(arg,
nullptr);
2093 return runCallback(xsink, cname, mname, rv, callback, l, *args);
2098 if (runCallback(xsink, cname, mname, rv, callback, l,
nullptr))
2101 switch (rv->getType()) {
2109 xsink->
raiseException(
"HTTP-TRAILER-ERROR",
"chunked callback returned type '%s'; expecting 'hash' or 'NOTHING'", rv->getTypeName());
2121 args->push(arg,
nullptr);
2124 return runCallback(xsink, cname, mname, rv, callback, l, *args);
2142 if (sock == QORE_INVALID_SOCKET) {
2143 se_not_open(cname, mname, xsink);
2144 return QSE_NOT_OPEN;
2152 assert(!aborted || !(*aborted));
2154 if (sock == QORE_INVALID_SOCKET) {
2155 se_not_open(cname, mname, xsink);
2156 return QSE_NOT_OPEN;
2160 se_in_op(cname, mname, xsink);
2163 se_in_op_thread(cname, mname, xsink);
2167 PrivateQoreSocketThroughputHelper th(
this,
true);
2170 bool nb = (timeout_ms >= 0);
2172 OptionalNonBlockingHelper onbh(*
this, !ssl && nb, xsink);
2176 qore_socket_op_helper oh(
this);
2185 bool data_available = tryReadSocketData(mname, xsink);
2187 if (data_available || *xsink) {
2189 return *xsink ? -1 : 0;
2195 rc = runCallback(xsink, cname, mname, res, send_callback, &l);
2204 switch (res->getType()) {
2234 const char* key = hi.getKey();
2241 do_header(key, buf, li.getValue());
2244 do_header(key, buf, v);
2256 xsink->
raiseException(
"SOCKET-CALLBACK-ERROR",
"HTTP chunked data callback returned type '%s'; expecting one of: 'string', 'binary', 'hash', 'nothing' (or 'NULL')", res->getTypeName());
2267 rc = sendIntern(xsink, cname, mname, buf.
getBuffer(), buf.
size(), timeout_ms, total,
true);
2271 if (aborted && *xsink) {
2272 bool data_available = tryReadSocketData(mname, xsink);
2274 if (data_available) {
2276 return *xsink ? -1 : 0;
2285 if (rc < 0 || sock == QORE_INVALID_SOCKET)
2291 return rc < 0 || sock == QORE_INVALID_SOCKET ? -1 : 0;
2294 DLLLOCAL
int sendIntern(
ExceptionSink* xsink,
const char* cname,
const char* mname,
const char* buf,
qore_size_t size,
int timeout_ms,
int64& total,
bool stream =
false) {
2300 bool nb = (timeout_ms >= 0);
2305 rc = ssl->write(mname, buf + bs, size - bs, timeout_ms, xsink);
2308 rc = ::send(sock, buf + bs, size - bs, 0);
2315 if (nb && (errno == EAGAIN
2317 || errno == EWOULDBLOCK
2320 if (!isWriteFinished(timeout_ms, mname, xsink)) {
2323 se_timeout(
"Socket", mname, timeout_ms, xsink);
2329 if (errno != EINTR) {
2333 xsink->
raiseErrnoException(
"SOCKET-SEND-ERROR", errno,
"error while executing %s::%s()", cname, mname);
2337 if (!stream && errno == EPIPE)
2341 if (!stream && errno == ECONNRESET)
2352 if (rc < 0 || sock == QORE_INVALID_SOCKET)
2357 do_send_event(rc, bs, size);
2366 DLLLOCAL
int send(
ExceptionSink* xsink,
const char* mname,
const char* buf,
qore_size_t size,
int timeout_ms = -1) {
2367 return send(xsink,
"Socket", mname, buf, size, timeout_ms);
2372 DLLLOCAL
int send(
ExceptionSink* xsink,
const char* cname,
const char* mname,
const char* buf,
qore_size_t size,
int timeout_ms = -1) {
2374 if (sock == QORE_INVALID_SOCKET) {
2375 se_not_open(cname, mname, xsink);
2377 return QSE_NOT_OPEN;
2381 se_in_op(cname, mname, xsink);
2384 se_in_op_thread(cname, mname, xsink);
2388 PrivateQoreSocketThroughputHelper th(
this,
true);
2391 bool nb = (timeout_ms >= 0);
2393 OptionalNonBlockingHelper onbh(*
this, !ssl && nb, xsink);
2398 qore_offset_t rc = sendIntern(xsink, cname, mname, buf, size, timeout_ms, total);
2401 return rc < 0 || sock == QORE_INVALID_SOCKET ? rc : 0;
2405 if (sock == QORE_INVALID_SOCKET) {
2406 se_not_open(
"Socket",
"sendFromInputStream", xsink);
2411 se_in_op(
"Socket",
"sendFromInputStream", xsink);
2414 se_in_op_thread(
"Socket",
"sendFromInputStream", xsink);
2418 qore_socket_op_helper oh(
this);
2420 PrivateQoreSocketThroughputHelper th(
this,
true);
2423 bool nb = (timeout >= 0);
2425 OptionalNonBlockingHelper onbh(*
this, !ssl && nb, xsink);
2429 char buf[DEFAULT_SOCKET_BUFSIZE];
2432 while (size < 0 || sent < size) {
2433 int64 toRead = size < 0 ? DEFAULT_SOCKET_BUFSIZE :
QORE_MIN(size - sent, DEFAULT_SOCKET_BUFSIZE);
2437 r = is->
read(buf, toRead, xsink);
2446 xsink->
raiseException(
"SOCKET-SEND-ERROR",
"Unexpected end of stream");
2452 qore_offset_t rc = sendIntern(xsink,
"Socket",
"sendFromInputStream", buf, r, timeout, total);
2462 if (sock == QORE_INVALID_SOCKET) {
2463 se_not_open(
"Socket",
"sendHttpChunkedBodyFromInputStream", xsink);
2468 se_in_op(
"Socket",
"sendHttpChunkedBodyFromInputStream", xsink);
2471 se_in_op_thread(
"Socket",
"sendHttpChunkedBodyFromInputStream", xsink);
2475 qore_socket_op_helper oh(
this);
2477 PrivateQoreSocketThroughputHelper th(
this,
true);
2480 bool nb = (timeout >= 0);
2482 OptionalNonBlockingHelper onbh(*
this, !ssl && nb, xsink);
2488 buf->preallocate(max_chunk_size);
2494 r = is->
read((
void*)buf->getPtr(),
sizeof(max_chunk_size), xsink);
2501 str.
sprintf(
"%x\r\n", (
int)r);
2502 int rc = sendIntern(xsink,
"Socket",
"sendHttpChunkedBodyFromInputStream", str.
c_str(), str.
size(), timeout, total,
true);
2506 bool trailers =
false;
2510 rc = sendIntern(xsink,
"Socket",
"sendHttpChunkedBodyFromInputStream", (
const char*)buf->getPtr(), r, timeout, total,
true);
2513 }
else if (trailer_callback) {
2517 if (runTrailerCallback(xsink,
"Socket",
"sendHttpChunkedBodyFromInputStream", *trailer_callback, l, h))
2521 do_headers(str, *h, 0);
2523 rc = sendIntern(xsink,
"Socket",
"sendHttpChunkedBodyFromInputStream", str.
c_str(), str.
size(), timeout, total,
true);
2534 rc = sendIntern(xsink,
"Socket",
"sendHttpChunkedBodyFromInputStream", str.
c_str(), str.
size(), timeout, total,
true);
2548 if (sock == QORE_INVALID_SOCKET) {
2549 se_not_open(
"Socket",
"sendHttpChunkedBodyTrailer", xsink);
2554 se_in_op(
"Socket",
"sendHttpChunkedBodyTrailer", xsink);
2557 se_in_op_thread(
"Socket",
"sendHttpChunkedBodyTrailer", xsink);
2567 const char* key = hi.getKey();
2572 do_header(key, buf, li.getValue());
2575 do_header(key, buf, v);
2580 sendIntern(xsink,
"Socket",
"sendHttpChunkedBodyTrailer", buf.
getBuffer(), buf.
size(), timeout, total,
true);
2583 DLLLOCAL
int sendHttpMessage(
ExceptionSink* xsink,
QoreHashNode* info,
const char* cname,
const char* mname,
const char* method,
const char* path,
const char* http_version,
const QoreHashNode* headers,
const void *data,
qore_size_t size,
const ResolvedCallReferenceNode* send_callback,
InputStream* is,
size_t max_chunk_size,
const ResolvedCallReferenceNode* trailer_callback,
int source,
int timeout_ms = -1,
QoreThreadLock* l = 0,
bool* aborted = 0) {
2585 assert(!(data && send_callback));
2586 assert(!(data && is));
2587 assert(!(send_callback && is));
2592 hdr.sprintf(
"%s %s HTTP/%s", method, path && path[0] ? path :
"/", http_version);
2598 do_send_http_message(hdr, headers, source);
2602 do_headers(hdr, headers, size && data ? size : 0);
2607 if ((rc = send(xsink, cname, mname, hdr.getBuffer(), hdr.strlen(), timeout_ms)))
2611 return send(xsink, cname, mname, (
char*)data, size, timeout_ms);
2612 else if (send_callback) {
2614 assert(!aborted || !(*aborted));
2615 return sendHttpChunkedWithCallback(xsink, cname, mname, *send_callback, *l, source, timeout_ms, aborted);
2618 assert(!aborted || !(*aborted));
2620 sendHttpChunkedBodyFromInputStream(is, max_chunk_size, timeout_ms, xsink, l, trailer_callback);
2621 return *xsink ? -1 : 0;
2627 DLLLOCAL
int sendHttpResponse(
ExceptionSink* xsink,
const char* cname,
const char* mname,
int code,
const char* desc,
const char* http_version,
const QoreHashNode* headers,
const void *data,
qore_size_t size,
const ResolvedCallReferenceNode* send_callback,
int source,
int timeout_ms = -1,
QoreThreadLock* l = 0,
bool* aborted = 0) {
2628 assert(!(data && send_callback));
2632 hdr.sprintf(
"HTTP/%s %03d %s", http_version, code, desc);
2634 do_send_http_message(hdr, headers, source);
2638 do_headers(hdr, headers, size && data ? size : 0,
true);
2643 if ((rc = send(xsink, cname, mname, hdr.getBuffer(), hdr.strlen(), timeout_ms)))
2647 return send(xsink, cname, mname, (
char*)data, size, timeout_ms);
2648 else if (send_callback) {
2650 assert(!aborted || !(*aborted));
2651 return sendHttpChunkedWithCallback(xsink, cname, mname, *send_callback, *l, source, timeout_ms, aborted);
2660 if (sock == QORE_INVALID_SOCKET) {
2661 se_not_open(cname,
"readHTTPChunkedBodyBinary", xsink);
2666 se_in_op(cname,
"readHTTPChunkedBodyBinary", xsink);
2669 se_in_op_thread(cname,
"readHTTPChunkedBodyBinary", xsink);
2674 if (http_exp_chunked_body)
2675 http_exp_chunked_body =
false;
2677 qore_socket_op_helper oh(
this);
2690 rc = brecv(xsink,
"readHTTPChunkedBodyBinary", buf, 1, 0, timeout,
false);
2694 se_closed(cname,
"readHTTPChunkedBodyBinary", xsink);
2701 if (!state && c ==
'\r')
2703 else if (state && c ==
'\n')
2717 char* p = (
char*)strchr(str.
getBuffer(),
';');
2720 long size = strtol(str.
getBuffer(), 0, 16);
2721 do_chunked_read(QORE_EVENT_HTTP_CHUNK_SIZE, size, str.
strlen(), source);
2727 xsink->
raiseException(
"READ-HTTP-CHUNK-ERROR",
"negative value given for chunk size (%ld)", size);
2734 qore_offset_t bs = size < DEFAULT_SOCKET_BUFSIZE ? size : DEFAULT_SOCKET_BUFSIZE;
2738 rc = brecv(xsink,
"readHTTPChunkedBodyBinary", buf, bs, 0, timeout,
false);
2743 se_closed(cname,
"readHTTPChunkedBodyBinary", xsink);
2750 os->
write(buf, rc, xsink);
2772 rc = brecv(xsink,
"readHTTPChunkedBodyBinary", buf, 2 - br, 0, timeout,
false);
2776 se_closed(cname,
"readHTTPChunkedBodyBinary", xsink);
2783 do_chunked_read(QORE_EVENT_HTTP_CHUNKED_DATA_RECEIVED, size, size + 2, source);
2785 if (recv_callback && !os) {
2786 if (runDataCallback(xsink, cname,
"readHTTPChunkedBodyBinary", *recv_callback, l, *b,
true))
2802 if (!recv_callback && !os)
2808 if (hdr->strlen() >= 2 && hdr->strlen() <= 4)
2809 return recv_callback ? 0 : h.release();
2811 if (recv_callback) {
2814 convertHeaderToHash(*h, (
char*)hdr->c_str(), 0, *info,
nullptr,
"response-headers-raw");
2815 do_read_http_header(QORE_EVENT_HTTP_FOOTERS_RECEIVED, *h, source);
2818 if (recv_callback) {
2819 runHeaderCallback(xsink, cname,
"readHTTPChunkedBodyBinary", *recv_callback, l, h->
empty() ? nullptr : *h,
2820 info.release(),
false, obj);
2831 if (sock == QORE_INVALID_SOCKET) {
2832 se_not_open(cname,
"readHTTPChunkedBody", xsink);
2837 se_in_op(cname,
"readHTTPChunkedBody", xsink);
2840 se_in_op_thread(cname,
"readHTTPChunkedBody", xsink);
2845 if (http_exp_chunked_body)
2846 http_exp_chunked_body =
false;
2848 qore_socket_op_helper oh(
this);
2861 rc = brecv(xsink,
"readHTTPChunkedBody", tbuf, 1, 0, timeout,
false);
2865 se_closed(cname,
"readHTTPChunkedBody", xsink);
2872 if (!state && c ==
'\r')
2874 else if (state && c ==
'\n')
2888 char* p = (
char*)strchr(str.
getBuffer(),
';');
2892 do_chunked_read(QORE_EVENT_HTTP_CHUNK_SIZE, size, str.
strlen(), source);
2898 xsink->
raiseException(
"READ-HTTP-CHUNK-ERROR",
"negative value given for chunk size (%ld)", size);
2908 qore_offset_t bs = size < DEFAULT_SOCKET_BUFSIZE ? size : DEFAULT_SOCKET_BUFSIZE;
2913 rc = brecv(xsink,
"readHTTPChunkedBody", tbuf, bs, 0, timeout,
false);
2917 se_closed(cname,
"readHTTPChunkedBody", xsink);
2922 buf->concat(tbuf, rc);
2938 rc = brecv(xsink,
"readHTTPChunkedBody", tbuf, 2 - br, 0, timeout,
false);
2942 se_closed(cname,
"readHTTPChunkedBody", xsink);
2949 do_chunked_read(QORE_EVENT_HTTP_CHUNKED_DATA_RECEIVED, size, size + 2, source);
2951 if (recv_callback) {
2952 if (runDataCallback(xsink, cname,
"readHTTPChunkedBody", *recv_callback, l, *buf,
true))
2971 if (hdr->strlen() >= 2 && hdr->strlen() <= 4)
2972 return recv_callback ? 0 : h.release();
2974 if (recv_callback) {
2977 convertHeaderToHash(*h, (
char*)hdr->c_str(), 0, *info,
nullptr,
"response-headers-raw");
2978 do_read_http_header(QORE_EVENT_HTTP_FOOTERS_RECEIVED, *h, source);
2981 if (recv_callback) {
2982 runHeaderCallback(xsink, cname,
"readHTTPChunkedBody", *recv_callback, l, h->
empty() ? nullptr : *h,
2983 info.release(),
false, obj);
2990 DLLLOCAL
static void do_accept_encoding(
char* t,
QoreHashNode& info) {
2999 while (*a && *a !=
';' && *a !=
',')
3003 l->push(str.release(),
nullptr);
3013 info.
setKeyValue(
"accept-encoding", l.release(), 0);
3016 DLLLOCAL
bool do_accept_charset(
char* t,
QoreHashNode& info) {
3017 bool acceptcharset =
false;
3032 if (*a ==
'u' || *a ==
'U') {
3034 if (*a ==
't' || *a ==
'T') {
3036 if (*a ==
'f' || *a ==
'F') {
3048 }
else if (*a ==
',') {
3052 }
else if (*a ==
';') {
3061 acceptcharset =
true;
3065 ac->concat(t, div - t);
3070 info.
setKeyValue(
"accept-charset", ac.release(), 0);
3071 acceptcharset =
true;
3075 return acceptcharset;
3080 bool* chunked =
nullptr,
const char* headers_raw_key =
"headers-raw") {
3081 bool close = !(flags & CHF_HTTP11);
3083 const char* senc = 0;
3085 bool acceptcharset =
false;
3093 std::string raw_key;
3098 if ((p = strstr(buf,
"\r\n"))) {
3101 }
else if ((p = strchr(buf,
'\n'))) {
3104 }
else if ((p = strchr(buf,
'\r'))) {
3109 char* t = strchr(buf,
':');
3114 while (t && qore_isblank(*t))
3124 if (flags & CHF_PROCESS) {
3125 if (!strcmp(buf,
"connection")) {
3126 if (flags & CHF_HTTP11) {
3127 if (strcasestr(t,
"close"))
3130 if (strcasestr(t,
"keep-alive"))
3133 }
else if (!strcmp(buf,
"content-type")) {
3134 char* a = strcasestr(t,
"charset=");
3137 char* e = strchr(a + 8,
';');
3141 cs.
concat(a + 8, e - a - 8);
3160 }
while (a > t && (*a ==
' ' || *a ==
';'));
3167 ct->concat(t, a - t + 1);
3173 info->
setKeyValue(
"body-content-type", ct.release(),
nullptr);
3177 info->
setKeyValue(
"body-content-type", val->refSelf(),
nullptr);
3179 }
else if (chunked && !strcmp(buf,
"transfer-encoding") && !strcasecmp(t,
"chunked")) {
3182 if (!strcmp(buf,
"accept-charset"))
3183 acceptcharset = do_accept_charset(t, *info);
3184 else if ((flags & CHF_REQUEST) && !strcmp(buf,
"accept-encoding"))
3185 do_accept_encoding(t, *info);
3190 if (raw_hdr && val) {
3191 val_copy = val->realCopy();
3195 hash_assignment_priv ha(*h, buf);
3196 if (!(*ha).isNothing()) {
3198 if ((*ha).getType() ==
NT_LIST) {
3202 l->
push(ha.swap(l),
nullptr);
3204 l->
push(val.release(),
nullptr);
3206 ha.assign(val.release(), 0);
3210 hash_assignment_priv ha(*raw_hdr, raw_key);
3211 if (!(*ha).isNothing()) {
3213 if ((*ha).getType() ==
NT_LIST) {
3217 l->
push(ha.swap(l),
nullptr);
3219 l->
push(val_copy.release(),
nullptr);
3221 ha.assign(val_copy.release(),
nullptr);
3225 if ((flags & CHF_PROCESS)) {
3229 if (info && !acceptcharset)
3236 DLLLOCAL
int recvix(
const char* meth,
int len,
void* targ,
int timeout_ms,
ExceptionSink* xsink) {
3238 if (sock == QORE_INVALID_SOCKET) {
3239 se_not_open(
"Socket", meth, xsink);
3240 return QSE_NOT_OPEN;
3244 se_in_op(
"Socket", meth, xsink);
3247 se_in_op_thread(
"Socket", meth, xsink);
3251 PrivateQoreSocketThroughputHelper th(
this,
false);
3256 qore_offset_t rc = brecv(xsink, meth, buf, len - br, 0, timeout_ms);
3258 do_read_error(rc, meth, timeout_ms, xsink);
3262 memcpy(targ, buf, rc);
3280 warn_queue->deref(xsink);
3283 tp_warning_bs = 0.0;
3291 if (warning_ms <= 0 && warning_bs <= 0) {
3292 xsink->
raiseException(
"SOCKET-SETWARNINGQUEUE-ERROR",
"Socket::setWarningQueue() at least one of warning ms argument: " QLLD
" and warning B/s argument: " QLLD
" must be greater than zero; to clear, call Socket::clearWarningQueue() with no arguments", warning_ms, warning_bs);
3302 warn_queue->
deref(xsink);
3306 warn_queue = qholder.release();
3307 callback_arg = holder.release();
3308 tl_warning_us = (
int64)warning_ms * 1000;
3309 tp_warning_bs = warning_bs;
3310 tp_us_min = min_ms * 1000;
3313 DLLLOCAL
void getUsageInfo(
QoreHashNode& h, qore_socket_private& s)
const {
3321 h.
setKeyValue(
"bytes_sent", tp_bytes_sent + s.tp_bytes_sent, 0);
3322 h.
setKeyValue(
"bytes_recv", tp_bytes_recv + s.tp_bytes_sent, 0);
3323 h.
setKeyValue(
"us_sent", tp_us_sent + s.tp_us_sent, 0);
3324 h.
setKeyValue(
"us_recv", tp_us_recv + s.tp_us_recv, 0);
3347 DLLLOCAL
void clearStats() {
3354 DLLLOCAL
void doTimeoutWarning(
const char* op,
int64 dt) {
3356 assert(dt > tl_warning_us);
3367 warn_queue->pushAndTakeRef(h);
3370 DLLLOCAL
void doThroughputWarning(
bool send,
int64 bytes,
int64 dt,
double bs) {
3372 assert(bs < tp_warning_bs);
3385 warn_queue->pushAndTakeRef(h);
3388 DLLLOCAL
bool pendingHttpChunkedBody()
const {
3389 return http_exp_chunked_body && sock != QORE_INVALID_SOCKET;
3392 DLLLOCAL
void setSslVerifyMode(
int mode) {
3394 ssl_verify_mode = mode;
3396 ssl->setVerifyMode(ssl_verify_mode, ssl_accept_all_certs);
3399 DLLLOCAL
void acceptAllCertificates(
bool accept_all =
true) {
3400 ssl_accept_all_certs = accept_all;
3402 ssl->setVerifyMode(ssl_verify_mode, ssl_accept_all_certs);
3406 sock.priv->getUsageInfo(h, *s.priv);
3409 DLLLOCAL
static qore_socket_private*
get(
QoreSocket& sock) {
3413 DLLLOCAL
static const qore_socket_private*
get(
const QoreSocket& sock) {
3417 DLLLOCAL
static void captureRemoteCert(X509_STORE_CTX* x509_ctx);
const qore_type_t NT_BINARY
type value for BinaryNode
Definition: node_types.h:49
DLLEXPORT const char * c_str() const
returns the string's buffer; this data should not be changed
Qore's arbitrary-precision number value type, dynamically-allocated only, reference counted...
Definition: QoreNumberNode.h:51
defines string encoding functions in Qore
Definition: QoreEncoding.h:83
a helper class for getting socket origination information
Definition: QoreSocket.h:73
DLLEXPORT bool empty() const
returns true if the string is empty, false if not
This is the hash or associative list container type in Qore, dynamically allocated only...
Definition: QoreHashNode.h:50
DLLEXPORT const QoreEncoding * QCS_DEFAULT
the default encoding for the Qore library
DLLEXPORT AbstractQoreNode * raiseExceptionArg(const char *err, QoreValue arg, const char *fmt,...)
appends a Qore-language exception to the list, and sets the 'arg' member (this object takes over the ...
DLLEXPORT void set(const char *str, const QoreEncoding *new_qorecharset=QCS_DEFAULT)
copies the c-string passed and sets the value of the string and its encoding
DLLEXPORT int gettid() noexcept
returns the current TID number
const qore_type_t NT_LIST
type value for QoreListNode
Definition: node_types.h:50
DLLEXPORT bool empty() const
returns true if empty
DLLEXPORT char * giveBuffer()
returns the character buffer and leaves the QoreString empty, the caller owns the memory returned (mu...
DLLEXPORT int sprintf(const char *fmt,...)
this will concatentate a formatted string to the existing string according to the format string and t...
The base class for all value and parse types in Qore expression trees.
Definition: AbstractQoreNode.h:54
const qore_type_t NT_NOTHING
type value for QoreNothingNode
Definition: node_types.h:42
DLLEXPORT int setKeyValue(const char *key, QoreValue value, ExceptionSink *xsink)
sets the value of "key" to "value"
size_t qore_size_t
used for sizes (same range as a pointer)
Definition: common.h:73
DLLEXPORT int getInfo(ExceptionSink *xsink, const char *node, const char *service, int family=Q_AF_UNSPEC, int flags=0, int socktype=Q_SOCK_STREAM, int protocol=0)
get address info with the given parameters, if any errors occur, a Qore-language exception is thrown ...
DLLEXPORT QoreHashNode * hashRefSelf() const
returns "this" with an incremented reference count
DLLLOCAL T * release()
releases the pointer to the caller
Definition: ReferenceHolder.h:91
DLLLOCAL hashdecl addrinfo * getAddrInfo() const
returns the hashdecl addrinfo * being managed (may by 0)
Definition: QoreNet.h:159
DLLEXPORT AbstractQoreNode * raiseException(const char *err, const char *fmt,...)
appends a Qore-language exception to the list
DLLEXPORT AbstractQoreNode * raiseErrnoException(const char *err, int en, const char *fmt,...)
appends a Qore-language exception to the list and appends the result of strerror(errno) to the descri...
provides an interface to getaddrinfo
Definition: QoreNet.h:132
DLLEXPORT int push(QoreValue val, ExceptionSink *xsink)
adds a value to the list
const qore_type_t NT_NULL
type value for QoreNullNode
Definition: node_types.h:48
Interface for private data of output streams.
Definition: OutputStream.h:44
DLLEXPORT AbstractQoreNode * refSelf() const
returns "this" with an incremented reference count
Qore's string type supported by the QoreEncoding class.
Definition: QoreString.h:81
Qore's string value type, reference counted, dynamically-allocated only.
Definition: QoreStringNode.h:50
virtual void write(const void *ptr, int64 count, ExceptionSink *xsink)=0
Writes bytes to the output stream.
DLLEXPORT const char * getBuffer() const
returns the string's buffer; this data should not be changed
static DLLEXPORT const char * getFamilyName(int address_family)
returns the name of the address family as a string (ie AF_INET = "ipv4", etc)
DLLEXPORT void concat(const QoreString *str, ExceptionSink *xsink)
concatenates a string and converts encodings if necessary
This is the list container type in Qore, dynamically allocated only, reference counted.
Definition: QoreListNode.h:52
DLLEXPORT QoreEncodingManager QEM
the QoreEncodingManager object
DLLEXPORT qore_size_t strlen() const
returns number of bytes in the string (not including the null pointer)
const qore_type_t NT_HASH
type value for QoreHashNode
Definition: node_types.h:51
static DLLEXPORT const QoreEncoding * findCreate(const char *name)
finds an encoding if it exists (also looks up against alias names) and creates a new one if it doesn'...
virtual DLLLOCAL QoreValue execValue(const QoreListNode *args, ExceptionSink *xsink) const =0
pure virtual function for executing the function reference
The main value class in Qore, designed to be passed by value.
Definition: QoreValue.h:262
const qore_type_t NT_STRING
type value for QoreStringNode
Definition: node_types.h:45
provides access to sockets using Qore data structures
Definition: QoreSocket.h:126
virtual DLLEXPORT AbstractQoreNode * realCopy() const =0
returns a copy of the object; the caller owns the reference count
the implementation of Qore's object data type, reference counted, dynamically-allocated only ...
Definition: QoreObject.h:61
DLLEXPORT bool empty() const
returns true if the hash has no members, false if not
DLLEXPORT void clear()
frees any managed memory and sets the size to 0
container for holding Qore-language exception information and also for registering a "thread_exit" ca...
Definition: ExceptionSink.h:46
const qore_type_t NT_FLOAT
type value for floating-point values (QoreValue only)
Definition: node_types.h:44
DLLEXPORT QoreStringNode * q_strerror(int errnum)
returns the error string as a QoreStringNode
const qore_type_t NT_BOOLEAN
type value for bools (QoreValue only)
Definition: node_types.h:47
static void strtolower(char *str)
convert a string to lower-case in place
Definition: QoreLib.h:270
const qore_type_t NT_INT
type value for integers (QoreValue only)
Definition: node_types.h:43
DLLEXPORT void deref(ExceptionSink *xsink)
decrements the reference count and calls derefImpl() if there_can_be_only_one is false, otherwise does nothing
long long int64
64bit integer type, cannot use int64_t here since it breaks the API on some 64-bit systems due to equ...
Definition: common.h:260
DLLEXPORT const void * getPtr() const
returns the pointer to the data
intptr_t qore_offset_t
used for offsets that could be negative
Definition: common.h:76
DLLEXPORT void trim(const char *chars=0)
remove leading and trailing whitespace or other characters
base class for resolved call references
Definition: CallReferenceNode.h:105
#define QORE_MIN(a, b)
macro to return the minimum of 2 numbers
Definition: QoreLib.h:538
constant iterator class for QoreHashNode, to be only created on the stack
Definition: QoreHashNode.h:563
provides a mutually-exclusive thread lock
Definition: QoreThreadLock.h:47
static DLLEXPORT QoreStringNode * getAddressDesc(int address_family, const char *addr)
returns a descriptive string for the address family and an address string (ie AF_INET6, "::1" = "ipv6[::1]")
const qore_type_t NT_NUMBER
type value for QoreNumberNode
Definition: node_types.h:53
DLLEXPORT void setValue(const char *key, QoreValue val, ExceptionSink *xsink)
sets the value of the given member to the given value
DLLEXPORT qore_size_t size() const
returns the number of bytes in the object
DLLEXPORT void clear()
reset string to zero length; memory is not deallocated; string encoding does not change ...
DLLEXPORT qore_size_t size() const
returns number of bytes in the string (not including the null pointer)
holds an object and dereferences it in the destructor
Definition: QoreValue.h:452
DLLEXPORT QoreValue refSelf() const
references the contained value if type == QV_Node, returns itself
DLLEXPORT void append(const void *nptr, qore_size_t size)
resizes the object and appends a copy of the data passed to the object
DLLEXPORT void discard(ExceptionSink *xsink)
dereferences any contained AbstractQoreNode pointer and sets to 0; does not modify other values ...
provides a safe and exception-safe way to release and re-acquire locks in Qore, only to be used on th...
Definition: QoreThreadLock.h:187
holds arbitrary binary data
Definition: BinaryNode.h:41
For use on the stack only: iterates through elements of a const QoreListNode.
Definition: QoreListNode.h:566