




免费预览已结束,剩余25页可下载查看
下载本文档
版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
基于netlink的内核态与用户态异步并发消息处理模型 用户态采用select模型,初始化时建立多个netlinksocket,绑定完成之后,向内核发送握手消息,这样内核可以将已经建立的连接记住,以便后续选择可用的连接发送数据。初始化和握手完成之后,由内核主动向用户态发送数据,用户态主线程在各个socket句柄上面等待读事件的到来,当检测到读事件时,向线程池提交数据读取和处理任务。这样模拟一个连接池和事件分发模型,保证内核数据及时被读取到用户态程序并处理,能做到并发。而内核态的netlink在接收数据时本身就是以系统调用的方式提供给业务层的发送接口,因此本身就是异步的,性能不是问题。内核态收到数据时,只需要提交给一个内核线程去处理即可。原型代码如下:共用头文件#define NETLINK_TEST 21#define MAX_DATA_LEN (768)#define MAX_PROCESS_COUNT 100#define MAX_PAYLOAD 1024 #define MAX_PID_COUNT MAX_PROCESS_COUNT#define MAX_REC_DATA_LEN 1024#define STATIC_PEROID 1024*500#define MSG_COUNT 10000#define TRUE 1用户态#include #include #include #include #include #include #include #include asm/types.h#include #include #include #include #include conf.hstruct endpoint unsigned int pid; struct sockaddr_nl src_addr; struct sockaddr_nl dest_addr; int sock_fd; struct msghdr msg; *endpoits = NULL;int listen_user(void);int handwithknl(void);int close_user(void);int doselect(struct timeval wait);void* threadProc(void* ed);void* sendThreadProc(void* arg);void flushcount(int len);void staticsout(void);void startSendThreads(void);static int maxfd = 0;static fd_set rset;static struct timeval tmout;unsigned char* readbuffer;static struct timeval tmnow;static struct timeval oldtime;static long reccount = 0;pthread_mutex_t mutex;static long readbytes = 0;static long readnum = 0;static long sendbytes = 0;int main(int argc, char* argv) endpoits = (struct endpoint*)malloc(sizeof(struct endpoint) * MAX_PID_COUNT); readbuffer = (unsigned char*)malloc(MAX_REC_DATA_LEN); / create an netlink socket and bind. listen_user(); if (pthread_mutex_init(&mutex, NULL) != 0 ) printf(Init metux error.n); return -1; / send a handshake msg to the knl. let the knl to see this client. handwithknl(); / sleep(6); / startSendThreads(); gettimeofday(&oldtime, NULL); tmout.tv_sec = 1; tmout.tv_usec = 0; / wait for event from the knl to dispach. while (1) doselect(tmout); if (readbytes STATIC_PEROID) staticsout(); / sleep(2); / close the socket. close_user(); return 0;int listen_user(void) int pidcount; struct endpoint* ed = NULL; struct nlmsghdr* nlh = NULL; unsigned int pid = getpid(); for (pidcount = 0; pidcount != MAX_PID_COUNT; pidcount+) ed = endpoits + pidcount; memset(void*)ed, 0, sizeof(struct endpoint); ed-sock_fd = socket(AF_NETLINK, SOCK_RAW, NETLINK_TEST); ed-pid = pid + pidcount; if (0 = ed-pid) pid = +ed-pid; ed-src_addr.nl_family = AF_NETLINK; ed-src_addr.nl_groups = 0; ed-src_addr.nl_pid = ed-pid; /TODO may be the src_addr.nl_pid is already in used, the bind will return a nonezero value. bind(ed-sock_fd, (struct sockaddr *) &ed-src_addr, sizeof(ed-src_addr); ed-dest_addr.nl_family = AF_NETLINK; ed-dest_addr.nl_pid = 0; ed-dest_addr.nl_groups = 0; /* Fill in the netlink message payload */ ed-msg.msg_name = (void *) &ed-dest_addr; ed-msg.msg_namelen = sizeof(ed-dest_addr); printf(init the socket %d of pid %d successful.n, ed-sock_fd, ed-pid); usleep(10000); return 0;int handwithknl(void) int pidcount; struct endpoint* ed = NULL; struct nlmsghdr* nlh = NULL; for (pidcount = 0; pidcount != MAX_PID_COUNT; pidcount+) ed = endpoits + pidcount; ed-msg.msg_iovlen = 1; ed-msg.msg_iov = malloc(sizeof(struct iovec); ed-msg.msg_iov-iov_base = malloc(NLMSG_SPACE(MAX_PAYLOAD); nlh = (struct nlmsghdr*)ed-msg.msg_iov-iov_base; nlh-nlmsg_len = NLMSG_SPACE(MAX_PAYLOAD); nlh-nlmsg_pid = ed-pid; nlh-nlmsg_flags = 0; ed-msg.msg_iov-iov_len = nlh-nlmsg_len; snprintf(char*)NLMSG_DATA(nlh), MAX_PAYLOAD - 1, Hello knl! This is %d!, ed-pid); / printf( Sending message from . .n, ed-pid); sendmsg(ed-sock_fd, &ed-msg, 0); if (ed-sock_fd maxfd) maxfd = ed-sock_fd; FD_SET(ed-sock_fd, &rset); / pthread_t tid; / if (0 = pthread_create(&tid, NULL, &threadProc, (void*)ed) / / printf(create a thread %u successful for the pid: %d. n, tid, ed-pid); / return 0;int close_user(void) int pidcount; for (pidcount = 0; pidcount != MAX_PID_COUNT; pidcount+) close(endpoitspidcount.sock_fd); return 0;int doselect(struct timeval wait) int pidcount; int selcount = 0; struct endpoint* ed = NULL; struct nlmsghdr* nlh = NULL; selcount = select(maxfd + 1, &rset, NULL, NULL, &wait); if (selcount = 0) return 0; else if (selcount sock_fd, &rset) int count = 0; int msglen = -1; int readstatus = 0; memset(readbuffer, 0, MAX_REC_DATA_LEN); ed-msg.msg_iov-iov_base = (void*)readbuffer; nlh = (struct nlmsghdr*)ed-msg.msg_iov-iov_base; while (TRUE) memset(readbuffer, 0, MAX_REC_DATA_LEN); readstatus = recvmsg(ed-sock_fd, &ed-msg, 0); if (readstatus = -1) printf(recieved error! %d n, ed-sock_fd); break; else if (readstatus = 0) printf(recieved error peer orderly shutdown! %d n, ed-sock_fd); break; else count += readstatus; / printf(count %dn, count); if (msglen = -1 & count = 16) msglen = nlh-nlmsg_len; / printf(msg len is: %un, msglen); if (msglen != -1 & count = msglen) / printf(success read a msg: %un, count); / pData = (int*)NLMSG_DATA(nlh); / for (i = 0; i != MAX_DATA_LEN/4; i+) / / printf(%d, *(pData + i); / / printf(n); readnum+; / printf(readnum is %dn, readnum); break; / printf(received %u bytes.n, count); / printf(received %d bytes, the peer pid is %d, the local pid is %d.n, nlh-nlmsg_len, nlh-nlmsg_pid, ed-pid); flushcount(count); FD_CLR(ed-sock_fd, &rset); FD_SET(ed-sock_fd, &rset); / printf(select one time.n); return 0; void* sendThreadProc(void* arg) int readcount = 0; struct endpoint* ed; struct nlmsghdr* nlh; fd_set rset_c; struct timeval wait_time; int val; int i; ed = (struct endpoint*)arg; ed-msg.msg_iov-iov_base = malloc(MAX_DATA_LEN); wait_time.tv_sec = 2; wait_time.tv_usec = 0; printf(send data thread %d start!n, ed-pid); for (i = 0; i != MSG_COUNT; i+) nlh = (struct nlmsghdr*)ed-msg.msg_iov-iov_base; nlh-nlmsg_len = NLMSG_SPACE(MAX_DATA_LEN); nlh-nlmsg_pid = ed-pid; nlh-nlmsg_flags = 0; ed-msg.msg_iov-iov_len = nlh-nlmsg_len; readcount = sendmsg(ed-sock_fd, &ed-msg, MSG_DONTWAIT); flushcount(readcount); printf(thread %d end!n, ed-pid);void* threadProc(void* arg) int readcount = 0; struct endpoint* ed; struct nlmsghdr* nlh; fd_set rset_c; struct timeval wait_time; int val; ed = (struct endpoint*)arg; ed-msg.msg_iov-iov_base = malloc(MAX_DATA_LEN); wait_time.tv_sec = 2; wait_time.tv_usec = 0; printf(thread %d start!n, ed-pid); for (;) FD_ZERO(&rset_c); FD_SET(ed-sock_fd, &rset_c); select(ed-sock_fd + 1, &rset_c, NULL, NULL, &wait_time); if (FD_ISSET(ed-sock_fd, &rset_c) recvmsg(ed-sock_fd, &ed-msg, 0); nlh = (struct nlmsghdr*)ed-msg.msg_iov-iov_base; flushcount(nlh-nlmsg_len); printf(thread %d end!n, ed-pid);void flushcount(int len) int val; val = pthread_mutex_lock(&mutex); if(val != 0) printf(lock error. n); pthread_mutex_unlock(&mutex); return; if (len 0) readbytes += len; readnum+; pthread_mutex_unlock(&mutex);void staticsout() int millsec; int val; val = pthread_mutex_lock(&mutex); if(val != 0) printf(lock error. n); pthread_mutex_unlock(&mutex); return; if (readbytes STATIC_PEROID) gettimeofday(&tmnow, NULL); millsec = (tmnow.tv_sec - oldtime.tv_sec) * 1000 + (tmnow.tv_usec - oldtime.tv_usec) / 1000; printf(received %d Kbytes, consumed time is: %dms, speed is: %5.3fK/s, %5.2f/s.n, readbytes / 1024, millsec, (float)(readbytes / 1024) * 1000 / millsec, (float)(readnum * 1000 / millsec); gettimeofday(&oldtime, NULL); readbytes = 0; readnum = 0; pthread_mutex_unlock(&mutex);void startSendThreads(void) int pidcount; struct endpoint* ed = NULL; struct nlmsghdr* nlh = NULL; for (pidcount = 0; pidcount != MAX_PID_COUNT; pidcount+) pthread_t tid; ed = endpoits + pidcount; if (0 = pthread_create(&tid, NULL, &sendThreadProc, (void*)ed) printf(create a sendthread %u successful for the pid: %d. n, tid, ed-pid); 内核态#include #include #include #include #include #include #include #include conf.h#ifndef SLEEP_MILLI_SEC #define SLEEP_MILLI_SEC(nMilliSec) do long timeout = (nMilliSec) * HZ / 1000; while(timeout 0) timeout = schedule_timeout(timeout); while(0); #endif struct sock* nl_sk = NULL;EXPORT_SYMBOL_GPL(nl_sk);static struct task_struct* task_testMAX_PROCESS_COUNT;static DECLARE_WAIT_QUEUE_HEAD(myevent_waitqueue); static u32 pidsMAX_PID_COUNT = 0;static int pidindex = 0;static int readBytes = 0;static int childDataThread(void* index) int threadindex = *(int*)index); struct sk_buff* skb; struct nlmsghdr* nlh; int rc; int len = NLMSG_SPACE(MAX_DATA_LEN); int a = MSG_COUNT; unsigned char randomindex; wait_queue_head_t timeout_wq; int devi = MAX_PID_COUNT / MAX_PROCESS_COUNT; init_waitqueue_head(&timeout_wq); printk(start thread %d.n, threadindex); allow_signal(SIGKILL); while (a- & !kthread_should_stop() int* pData; int i = 0; skb = alloc_skb(len, GFP_ATOMIC); if (!skb) printk(KERN_ERR net_link: allocate failed.n); return -1; nlh = nlmsg_put(skb, 0, 0, 0, MAX_DATA_LEN, 0); NETLINK_CB(skb).pid = 0; / pData = (int*)NLMSG_DATA(nlh); / for (i = 0; i != MAX_DATA_LEN/4; i+) / / *(pData + i) = i; / get_random_bytes(&randomindex, 1); randomindex = randomindex % devi + threadindex * devi; / printk(radmonindex is: %d, threadindex is %dn, randomindex, threadindex); if (pidsrandomindex != 0) / printk(net_link: going to send, peer pid is: %d, a is: %d.n, pidsrandomindex, a); rc = netlink_unicast(nl_sk, skb, pidsrandomindex, MSG_DONTWAIT); if (rc len = NLMSG_SPACE(0) nlh = nlmsg_hdr(skb); / printk(net_link: recv %s.n, (char *) NLMSG_DATA(nlh); if (pidindex nlmsg_pid; if (pidindex = MAX_PID_COUNT - 1) int i; for (i = 0; i != MAX_PROCESS_COUNT; i+) wake_up_process(task_testi); printk(wake up the thread %d.n, i); pidindex+; readBytes += nlh-nlmsg_len; if (readBytes STATIC_PEROID) printk(received %d bytes.n, readBytes); readBytes = 0; kfree_skb(skb); return;static int init_netlink(void) nl_sk = netlink_kernel_create(&init_net, NETLINK_TEST, 0, nl_data_ready, NULL, THIS_MODULE); if (!nl_sk) printk(KERN_ERR net_link: Cannot create netlink socket.n); return -EIO; printk(net_link: create socket ok.n); return 0;int init_thread(void) int i = 0; char processName64 = 0; for (i = 0; i != MAX_PROCESS_COUNT; i+) void* data = kmalloc(sizeof(int), GFP_ATOMIC); *(int*)data = i; snprintf(processName, 63, childDataThread-%d, i); task_testi = kthread_create(childDataThread, data, processName); if (IS_ERR(task_testi) return PTR_ERR(task_testi); printk(init thread (%d) ok!n, i); return 0;int knl_init(void) init_netlink(); init_thread(); return 0;void stop_kthreads(void) int i; for (i = 0; i != MAX_PROCESS_COUNT; i+) kthread_stop(task_testi); void knl_exit(void) stop_kthreads(); if (nl_sk != NULL) sock_release(nl_sk-sk_socket); printk(net_link: remove ok.n);module_exit(knl_exit);module_init(knl_init);MODULE_LICENSE(GPL);MODULE_AUTHOR(r);MakefileMODULE_NAME := knlobj-m += $(MODULE_NAME).oKERNELDIR ?= /lib/modules/$(shell uname -r)/buildPWD := $(shell pwd)all: $(MAKE) -C $(KERNELDIR) M=$(PWD) gcc -g -o usr -lpthread usr.cclean: rm -f *.ko *.o *.cmd usr $(MODULE_NAME).mod.c Module.symversin:clean rm all insmod knl.korm: rmmod knl.kosp: cat /proc/net/knlru: ./usrsm: dmesg -c初步测试的性能结果为:init the socket 188 of pid 24056 successful.init the socket 189 of pid 24057 successful.init the socket 190 of pid 24058 successful.init the socket 191 of pid 24059 successful.init the socket 192 of pid 24060 successful.init the socket 193 of pid 24061 successful.init the socket 194 of pid 24062 successful.init the socket 195 of pid 24063 successful.init the socket 196 of pid 24064 successful.init the socket 197 of pid 24065 successful.init the socket 198 of pid 24066 successful.init the socket 199 of pid 24067 successful.init the socket 200 of pid 24068 successful.init the socket 201 of pid 24069 successful.init the socket 202 of pid 24070 successful.received 30 Mbytes, consumed time is: 10227ms, speed is: 2.933M/s, 4012.00/s.received 30 Mbytes, consumed time is: 10062ms, speed is: 2.982M/s, 4013.00/s.received 30 Mbytes, consumed time is: 10052ms, speed is: 2.984M/s, 4012.00/s.received 30 Mbytes, consumed time is: 10069ms, speed is: 2.979M/s, 4013.00/s.received 30 Mbytes, consumed time is: 10113ms, speed is: 2.966M/s, 4012.00/s.received 30 Mbytes, consumed time is: 10071ms, speed is: 2.979M/s, 4012.00/s.received 30 Mbytes, consumed time is: 10289ms, speed is: 2.916M/s, 4014.00/s.received 30 Mbytes, consumed time is: 10247ms, speed is: 2.928M/s, 4013.00/s.received 30 Mbytes, consumed time is: 10347ms, speed is: 2.899M/s, 4013.00/s.received 30 Mbytes, consumed time is: 10340ms, speed is: 2.901M/s, 4013.00/s.received 30 Mbytes, consumed time is: 10107ms, speed is: 2.968M/s, 4012.00/s.received 30 Mbytes, consumed time is: 10267ms, speed is: 2.922M/s, 4013.00/s.内核态的运行结果有较多发送失败的打印:583 start thread 193.584 start thread 194.585 start thread 93.586 start thread 195.587 start thread 196.588 start thread 197.589 start thread 198.590 start thread 199.591 start thread 92.592 start thread 91.593 start thr
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 2025年滨州邹平怀远学校教师考前自测高频考点模拟试题带答案详解
- 【中考专题】2026年中考数学专项提优复习:方程与方程组【附答案】
- 2025汽车融资租赁合同范例
- 2025昆仑数智科技有限责任公司春季高校毕业生招聘15人模拟试卷及完整答案详解1套
- 2025湖南怀化国际陆港辰溪港区发展有限责任公司招聘工作人员拟聘用人员考前自测高频考点模拟试题及答案详解(考点梳理)
- 2025年河北廊坊市农林科学院公开选聘博士研究生1名考前自测高频考点模拟试题及参考答案详解1套
- 2025第二季度贵州安顺市平坝区美农科技有限公司招聘9人考前自测高频考点模拟试题及答案详解(必刷)
- 2025北京首都师范大学实验小学招聘2人模拟试卷附答案详解
- 2025杭州市钱塘区教育局所属事业单位高层次人才引进15人模拟试卷及参考答案详解一套
- 2025福建厦门市集美区实验小学顶岗教师招聘1人考前自测高频考点模拟试题及答案详解(易错题)
- 光伏土建培训课件
- 爱心义卖班会课课件
- 化验员职业技能培训考试题库及答案(含各题型)
- 2025年广东省中考历史试题卷(含答案详解)
- 大米直播促销活动方案
- 阴挺的中医护理
- 2025-2030中国便携式卫星通信终端行业前景动态与投资战略研究报告
- 过敏反应的防治与治疗讲课件
- 2025至2030年中国石油石化装备制造行业市场现状分析及投资前景研判报告
- 物流运输规章管理制度
- 中药熏洗法试题及答案
评论
0/150
提交评论