X-Git-Url: http://git.openwrt.org/?a=blobdiff_plain;f=ubusd_main.c;h=adbd2932be3d22a1317ca8a33be50fb30662f7cd;hb=HEAD;hp=81868c1482bcc7e6029000e7437d7bb9dd9b7705;hpb=c413be9b376c685e4a5b04b1d0d9d716dfbeb460;p=project%2Fubus.git diff --git a/ubusd_main.c b/ubusd_main.c index 81868c1..adbd293 100644 --- a/ubusd_main.c +++ b/ubusd_main.c @@ -6,37 +6,22 @@ #include #include +#include #ifdef FreeBSD #include #endif +#include #include #include #include "ubusd.h" -static struct ubus_msg_buf *ubus_msg_head(struct ubus_client *cl) -{ - return cl->tx_queue[cl->txq_cur]; -} - -static void ubus_msg_dequeue(struct ubus_client *cl) -{ - struct ubus_msg_buf *ub = ubus_msg_head(cl); - - if (!ub) - return; - - ubus_msg_free(ub); - cl->txq_ofs = 0; - cl->tx_queue[cl->txq_cur] = NULL; - cl->txq_cur = (cl->txq_cur + 1) % ARRAY_SIZE(cl->tx_queue); -} - static void handle_client_disconnect(struct ubus_client *cl) { - while (ubus_msg_head(cl)) - ubus_msg_dequeue(cl); + struct ubus_msg_buf_list *ubl, *ubl2; + list_for_each_entry_safe(ubl, ubl2, &cl->tx_queue, list) + ubus_msg_list_free(ubl); ubusd_monitor_disconnect(cl); ubusd_proto_free_client(cl); @@ -47,30 +32,57 @@ static void handle_client_disconnect(struct ubus_client *cl) free(cl); } +static void ubus_client_cmd_free(struct ubus_client_cmd *cmd) +{ + list_del(&cmd->list); + ubus_msg_free(cmd->msg); + free(cmd); +} + +static void ubus_client_cmd_queue_process(struct ubus_client *cl) +{ + struct ubus_client_cmd *cmd, *tmp; + + list_for_each_entry_safe(cmd, tmp, &cl->cmd_queue, list) { + int ret = ubusd_cmd_lookup(cl, cmd); + + /* Stop if the last command caused buffering again */ + if (ret == -2) + break; + + ubus_client_cmd_free(cmd); + } +} + static void client_cb(struct uloop_fd *sock, unsigned int events) { struct ubus_client *cl = container_of(sock, struct ubus_client, sock); + uint8_t fd_buf[CMSG_SPACE(sizeof(int))] = { 0 }; + struct msghdr msghdr = { 0 }; struct ubus_msg_buf *ub; + struct ubus_msg_buf_list *ubl, *ubl2; static struct iovec iov; - static struct { - int fd; - struct cmsghdr h; - } fd_buf = { - .h = { - .cmsg_type = SCM_RIGHTS, - .cmsg_level = SOL_SOCKET, - .cmsg_len = sizeof(fd_buf), - } - }; - struct msghdr msghdr = { - .msg_iov = &iov, - .msg_iovlen = 1, - }; + struct cmsghdr *cmsg; + int *pfd; + + msghdr.msg_iov = &iov, + msghdr.msg_iovlen = 1, + msghdr.msg_control = fd_buf; + msghdr.msg_controllen = sizeof(fd_buf); + + cmsg = CMSG_FIRSTHDR(&msghdr); + cmsg->cmsg_type = SCM_RIGHTS; + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_len = CMSG_LEN(sizeof(int)); + + pfd = (int *) CMSG_DATA(cmsg); + msghdr.msg_controllen = cmsg->cmsg_len; /* first try to tx more pending data */ - while ((ub = ubus_msg_head(cl))) { + list_for_each_entry_safe(ubl, ubl2, &cl->tx_queue, list) { ssize_t written; + ub = ubl->msg; written = ubus_msg_writev(sock->fd, ub, cl->txq_ofs); if (written < 0) { switch(errno) { @@ -84,30 +96,37 @@ static void client_cb(struct uloop_fd *sock, unsigned int events) } cl->txq_ofs += written; + cl->txq_len -= written; if (cl->txq_ofs < ub->len + sizeof(ub->hdr)) break; - ubus_msg_dequeue(cl); + cl->txq_ofs = 0; + ubus_msg_list_free(ubl); } - /* prevent further ULOOP_WRITE events if we don't have data - * to send anymore */ - if (!ubus_msg_head(cl) && (events & ULOOP_WRITE)) - uloop_fd_add(sock, ULOOP_READ | ULOOP_EDGE_TRIGGER); + if (list_empty(&cl->tx_queue) && (events & ULOOP_WRITE)) { + /* Process queued commands */ + ubus_client_cmd_queue_process(cl); + + /* prevent further ULOOP_WRITE events if we don't have data + * to send anymore */ + if (list_empty(&cl->tx_queue)) + uloop_fd_add(sock, ULOOP_READ | ULOOP_EDGE_TRIGGER); + } retry: if (!sock->eof && cl->pending_msg_offset < (int) sizeof(cl->hdrbuf)) { int offset = cl->pending_msg_offset; int bytes; - fd_buf.fd = -1; + *pfd = -1; iov.iov_base = ((char *) &cl->hdrbuf) + offset; iov.iov_len = sizeof(cl->hdrbuf) - offset; if (cl->pending_msg_fd < 0) { - msghdr.msg_control = &fd_buf; - msghdr.msg_controllen = sizeof(fd_buf); + msghdr.msg_control = fd_buf; + msghdr.msg_controllen = cmsg->cmsg_len; } else { msghdr.msg_control = NULL; msghdr.msg_controllen = 0; @@ -117,13 +136,15 @@ retry: if (bytes < 0) goto out; - if (fd_buf.fd >= 0) - cl->pending_msg_fd = fd_buf.fd; + if (*pfd >= 0) + cl->pending_msg_fd = *pfd; cl->pending_msg_offset += bytes; if (cl->pending_msg_offset < (int) sizeof(cl->hdrbuf)) goto out; + if (blob_raw_len(&cl->hdrbuf.data) < sizeof(struct blob_attr)) + goto disconnect; if (blob_pad_len(&cl->hdrbuf.data) > UBUS_MAX_MSGLEN) goto disconnect; @@ -166,7 +187,7 @@ retry: } out: - if (!sock->eof || ubus_msg_head(cl)) + if (!sock->eof || !list_empty(&cl->tx_queue)) return; disconnect: @@ -226,6 +247,21 @@ static void sighup_handler(int sig) ubusd_acl_load(); } +static void mkdir_sockdir() +{ + char *ubus_sock_dir, *tmp; + + ubus_sock_dir = strdup(UBUS_UNIX_SOCKET); + tmp = strrchr(ubus_sock_dir, '/'); + if (tmp) { + *tmp = '\0'; + mkdir(ubus_sock_dir, 0755); + } + free(ubus_sock_dir); +} + +#include + int main(int argc, char **argv) { const char *ubus_socket = UBUS_UNIX_SOCKET; @@ -235,6 +271,7 @@ int main(int argc, char **argv) signal(SIGPIPE, SIG_IGN); signal(SIGHUP, sighup_handler); + ulog_open(ULOG_KMSG | ULOG_SYSLOG, LOG_DAEMON, "ubusd"); openlog("ubusd", LOG_PID, LOG_DAEMON); uloop_init(); @@ -251,6 +288,7 @@ int main(int argc, char **argv) } } + mkdir_sockdir(); unlink(ubus_socket); umask(0111); server_fd.fd = usock(USOCK_UNIX | USOCK_SERVER | USOCK_NONBLOCK, ubus_socket, NULL);