1 | // SPDX-License-Identifier: GPL-2.0-only |
2 | /****************************************************************************** |
3 | ******************************************************************************* |
4 | ** |
5 | ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. |
6 | ** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. |
7 | ** |
8 | ** |
9 | ******************************************************************************* |
10 | ******************************************************************************/ |
11 | |
12 | /* |
13 | * lowcomms.c |
14 | * |
15 | * This is the "low-level" comms layer. |
16 | * |
17 | * It is responsible for sending/receiving messages |
18 | * from other nodes in the cluster. |
19 | * |
20 | * Cluster nodes are referred to by their nodeids. nodeids are |
21 | * simply 32 bit numbers to the locking module - if they need to |
22 | * be expanded for the cluster infrastructure then that is its |
23 | * responsibility. It is this layer's |
24 | * responsibility to resolve these into IP address or |
25 | * whatever it needs for inter-node communication. |
26 | * |
27 | * The comms level is two kernel threads that deal mainly with |
28 | * the receiving of messages from other nodes and passing them |
29 | * up to the mid-level comms layer (which understands the |
30 | * message format) for execution by the locking core, and |
31 | * a send thread which does all the setting up of connections |
32 | * to remote nodes and the sending of data. Threads are not allowed |
33 | * to send their own data because it may cause them to wait in times |
34 | * of high load. Also, this way, the sending thread can collect together |
35 | * messages bound for one node and send them in one block. |
36 | * |
37 | * lowcomms will choose to use either TCP or SCTP as its transport layer |
38 | * depending on the configuration variable 'protocol'. This should be set |
39 | * to 0 (default) for TCP or 1 for SCTP. It should be configured using a |
40 | * cluster-wide mechanism as it must be the same on all nodes of the cluster |
41 | * for the DLM to function. |
42 | * |
43 | */ |
44 | |
45 | #include <asm/ioctls.h> |
46 | #include <net/sock.h> |
47 | #include <net/tcp.h> |
48 | #include <linux/pagemap.h> |
49 | #include <linux/file.h> |
50 | #include <linux/mutex.h> |
51 | #include <linux/sctp.h> |
52 | #include <linux/slab.h> |
53 | #include <net/sctp/sctp.h> |
54 | #include <net/ipv6.h> |
55 | |
56 | #include <trace/events/dlm.h> |
57 | #include <trace/events/sock.h> |
58 | |
59 | #include "dlm_internal.h" |
60 | #include "lowcomms.h" |
61 | #include "midcomms.h" |
62 | #include "memory.h" |
63 | #include "config.h" |
64 | |
65 | #define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(5000) |
66 | #define DLM_MAX_PROCESS_BUFFERS 24 |
67 | #define NEEDED_RMEM (4*1024*1024) |
68 | |
69 | struct connection { |
70 | struct socket *sock; /* NULL if not connected */ |
71 | uint32_t nodeid; /* So we know who we are in the list */ |
72 | /* this semaphore is used to allow parallel recv/send in read |
73 | * lock mode. When we release a sock we need to held the write lock. |
74 | * |
75 | * However this is locking code and not nice. When we remove the |
76 | * othercon handling we can look into other mechanism to synchronize |
77 | * io handling to call sock_release() at the right time. |
78 | */ |
79 | struct rw_semaphore sock_lock; |
80 | unsigned long flags; |
81 | #define CF_APP_LIMITED 0 |
82 | #define CF_RECV_PENDING 1 |
83 | #define CF_SEND_PENDING 2 |
84 | #define CF_RECV_INTR 3 |
85 | #define CF_IO_STOP 4 |
86 | #define CF_IS_OTHERCON 5 |
87 | struct list_head writequeue; /* List of outgoing writequeue_entries */ |
88 | spinlock_t writequeue_lock; |
89 | int retries; |
90 | struct hlist_node list; |
91 | /* due some connect()/accept() races we currently have this cross over |
92 | * connection attempt second connection for one node. |
93 | * |
94 | * There is a solution to avoid the race by introducing a connect |
95 | * rule as e.g. our_nodeid > nodeid_to_connect who is allowed to |
96 | * connect. Otherside can connect but will only be considered that |
97 | * the other side wants to have a reconnect. |
98 | * |
99 | * However changing to this behaviour will break backwards compatible. |
100 | * In a DLM protocol major version upgrade we should remove this! |
101 | */ |
102 | struct connection *othercon; |
103 | struct work_struct rwork; /* receive worker */ |
104 | struct work_struct swork; /* send worker */ |
105 | wait_queue_head_t shutdown_wait; |
106 | unsigned char rx_leftover_buf[DLM_MAX_SOCKET_BUFSIZE]; |
107 | int rx_leftover; |
108 | int mark; |
109 | int addr_count; |
110 | int curr_addr_index; |
111 | struct sockaddr_storage addr[DLM_MAX_ADDR_COUNT]; |
112 | spinlock_t addrs_lock; |
113 | struct rcu_head rcu; |
114 | }; |
115 | #define sock2con(x) ((struct connection *)(x)->sk_user_data) |
116 | |
117 | struct listen_connection { |
118 | struct socket *sock; |
119 | struct work_struct rwork; |
120 | }; |
121 | |
122 | #define DLM_WQ_REMAIN_BYTES(e) (PAGE_SIZE - e->end) |
123 | #define DLM_WQ_LENGTH_BYTES(e) (e->end - e->offset) |
124 | |
125 | /* An entry waiting to be sent */ |
126 | struct writequeue_entry { |
127 | struct list_head list; |
128 | struct page *page; |
129 | int offset; |
130 | int len; |
131 | int end; |
132 | int users; |
133 | bool dirty; |
134 | struct connection *con; |
135 | struct list_head msgs; |
136 | struct kref ref; |
137 | }; |
138 | |
139 | struct dlm_msg { |
140 | struct writequeue_entry *entry; |
141 | struct dlm_msg *orig_msg; |
142 | bool retransmit; |
143 | void *ppc; |
144 | int len; |
145 | int idx; /* new()/commit() idx exchange */ |
146 | |
147 | struct list_head list; |
148 | struct kref ref; |
149 | }; |
150 | |
151 | struct processqueue_entry { |
152 | unsigned char *buf; |
153 | int nodeid; |
154 | int buflen; |
155 | |
156 | struct list_head list; |
157 | }; |
158 | |
159 | struct dlm_proto_ops { |
160 | bool try_new_addr; |
161 | const char *name; |
162 | int proto; |
163 | |
164 | int (*connect)(struct connection *con, struct socket *sock, |
165 | struct sockaddr *addr, int addr_len); |
166 | void (*sockopts)(struct socket *sock); |
167 | int (*bind)(struct socket *sock); |
168 | int (*listen_validate)(void); |
169 | void (*listen_sockopts)(struct socket *sock); |
170 | int (*listen_bind)(struct socket *sock); |
171 | }; |
172 | |
173 | static struct listen_sock_callbacks { |
174 | void (*sk_error_report)(struct sock *); |
175 | void (*sk_data_ready)(struct sock *); |
176 | void (*sk_state_change)(struct sock *); |
177 | void (*sk_write_space)(struct sock *); |
178 | } listen_sock; |
179 | |
180 | static struct listen_connection listen_con; |
181 | static struct sockaddr_storage dlm_local_addr[DLM_MAX_ADDR_COUNT]; |
182 | static int dlm_local_count; |
183 | |
184 | /* Work queues */ |
185 | static struct workqueue_struct *io_workqueue; |
186 | static struct workqueue_struct *process_workqueue; |
187 | |
188 | static struct hlist_head connection_hash[CONN_HASH_SIZE]; |
189 | static DEFINE_SPINLOCK(connections_lock); |
190 | DEFINE_STATIC_SRCU(connections_srcu); |
191 | |
192 | static const struct dlm_proto_ops *dlm_proto_ops; |
193 | |
194 | #define DLM_IO_SUCCESS 0 |
195 | #define DLM_IO_END 1 |
196 | #define DLM_IO_EOF 2 |
197 | #define DLM_IO_RESCHED 3 |
198 | #define DLM_IO_FLUSH 4 |
199 | |
200 | static void process_recv_sockets(struct work_struct *work); |
201 | static void process_send_sockets(struct work_struct *work); |
202 | static void process_dlm_messages(struct work_struct *work); |
203 | |
204 | static DECLARE_WORK(process_work, process_dlm_messages); |
205 | static DEFINE_SPINLOCK(processqueue_lock); |
206 | static bool process_dlm_messages_pending; |
207 | static atomic_t processqueue_count; |
208 | static LIST_HEAD(processqueue); |
209 | |
210 | bool dlm_lowcomms_is_running(void) |
211 | { |
212 | return !!listen_con.sock; |
213 | } |
214 | |
215 | static void lowcomms_queue_swork(struct connection *con) |
216 | { |
217 | assert_spin_locked(&con->writequeue_lock); |
218 | |
219 | if (!test_bit(CF_IO_STOP, &con->flags) && |
220 | !test_bit(CF_APP_LIMITED, &con->flags) && |
221 | !test_and_set_bit(CF_SEND_PENDING, addr: &con->flags)) |
222 | queue_work(wq: io_workqueue, work: &con->swork); |
223 | } |
224 | |
225 | static void lowcomms_queue_rwork(struct connection *con) |
226 | { |
227 | #ifdef CONFIG_LOCKDEP |
228 | WARN_ON_ONCE(!lockdep_sock_is_held(con->sock->sk)); |
229 | #endif |
230 | |
231 | if (!test_bit(CF_IO_STOP, &con->flags) && |
232 | !test_and_set_bit(CF_RECV_PENDING, addr: &con->flags)) |
233 | queue_work(wq: io_workqueue, work: &con->rwork); |
234 | } |
235 | |
236 | static void writequeue_entry_ctor(void *data) |
237 | { |
238 | struct writequeue_entry *entry = data; |
239 | |
240 | INIT_LIST_HEAD(list: &entry->msgs); |
241 | } |
242 | |
243 | struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void) |
244 | { |
245 | return kmem_cache_create(name: "dlm_writequeue" , size: sizeof(struct writequeue_entry), |
246 | align: 0, flags: 0, ctor: writequeue_entry_ctor); |
247 | } |
248 | |
249 | struct kmem_cache *dlm_lowcomms_msg_cache_create(void) |
250 | { |
251 | return kmem_cache_create(name: "dlm_msg" , size: sizeof(struct dlm_msg), align: 0, flags: 0, NULL); |
252 | } |
253 | |
254 | /* need to held writequeue_lock */ |
255 | static struct writequeue_entry *con_next_wq(struct connection *con) |
256 | { |
257 | struct writequeue_entry *e; |
258 | |
259 | e = list_first_entry_or_null(&con->writequeue, struct writequeue_entry, |
260 | list); |
261 | /* if len is zero nothing is to send, if there are users filling |
262 | * buffers we wait until the users are done so we can send more. |
263 | */ |
264 | if (!e || e->users || e->len == 0) |
265 | return NULL; |
266 | |
267 | return e; |
268 | } |
269 | |
270 | static struct connection *__find_con(int nodeid, int r) |
271 | { |
272 | struct connection *con; |
273 | |
274 | hlist_for_each_entry_rcu(con, &connection_hash[r], list) { |
275 | if (con->nodeid == nodeid) |
276 | return con; |
277 | } |
278 | |
279 | return NULL; |
280 | } |
281 | |
282 | static void dlm_con_init(struct connection *con, int nodeid) |
283 | { |
284 | con->nodeid = nodeid; |
285 | init_rwsem(&con->sock_lock); |
286 | INIT_LIST_HEAD(list: &con->writequeue); |
287 | spin_lock_init(&con->writequeue_lock); |
288 | INIT_WORK(&con->swork, process_send_sockets); |
289 | INIT_WORK(&con->rwork, process_recv_sockets); |
290 | spin_lock_init(&con->addrs_lock); |
291 | init_waitqueue_head(&con->shutdown_wait); |
292 | } |
293 | |
294 | /* |
295 | * If 'allocation' is zero then we don't attempt to create a new |
296 | * connection structure for this node. |
297 | */ |
298 | static struct connection *nodeid2con(int nodeid, gfp_t alloc) |
299 | { |
300 | struct connection *con, *tmp; |
301 | int r; |
302 | |
303 | r = nodeid_hash(nodeid); |
304 | con = __find_con(nodeid, r); |
305 | if (con || !alloc) |
306 | return con; |
307 | |
308 | con = kzalloc(size: sizeof(*con), flags: alloc); |
309 | if (!con) |
310 | return NULL; |
311 | |
312 | dlm_con_init(con, nodeid); |
313 | |
314 | spin_lock(lock: &connections_lock); |
315 | /* Because multiple workqueues/threads calls this function it can |
316 | * race on multiple cpu's. Instead of locking hot path __find_con() |
317 | * we just check in rare cases of recently added nodes again |
318 | * under protection of connections_lock. If this is the case we |
319 | * abort our connection creation and return the existing connection. |
320 | */ |
321 | tmp = __find_con(nodeid, r); |
322 | if (tmp) { |
323 | spin_unlock(lock: &connections_lock); |
324 | kfree(objp: con); |
325 | return tmp; |
326 | } |
327 | |
328 | hlist_add_head_rcu(n: &con->list, h: &connection_hash[r]); |
329 | spin_unlock(lock: &connections_lock); |
330 | |
331 | return con; |
332 | } |
333 | |
334 | static int addr_compare(const struct sockaddr_storage *x, |
335 | const struct sockaddr_storage *y) |
336 | { |
337 | switch (x->ss_family) { |
338 | case AF_INET: { |
339 | struct sockaddr_in *sinx = (struct sockaddr_in *)x; |
340 | struct sockaddr_in *siny = (struct sockaddr_in *)y; |
341 | if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr) |
342 | return 0; |
343 | if (sinx->sin_port != siny->sin_port) |
344 | return 0; |
345 | break; |
346 | } |
347 | case AF_INET6: { |
348 | struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x; |
349 | struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y; |
350 | if (!ipv6_addr_equal(a1: &sinx->sin6_addr, a2: &siny->sin6_addr)) |
351 | return 0; |
352 | if (sinx->sin6_port != siny->sin6_port) |
353 | return 0; |
354 | break; |
355 | } |
356 | default: |
357 | return 0; |
358 | } |
359 | return 1; |
360 | } |
361 | |
362 | static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out, |
363 | struct sockaddr *sa_out, bool try_new_addr, |
364 | unsigned int *mark) |
365 | { |
366 | struct sockaddr_storage sas; |
367 | struct connection *con; |
368 | int idx; |
369 | |
370 | if (!dlm_local_count) |
371 | return -1; |
372 | |
373 | idx = srcu_read_lock(ssp: &connections_srcu); |
374 | con = nodeid2con(nodeid, alloc: 0); |
375 | if (!con) { |
376 | srcu_read_unlock(ssp: &connections_srcu, idx); |
377 | return -ENOENT; |
378 | } |
379 | |
380 | spin_lock(lock: &con->addrs_lock); |
381 | if (!con->addr_count) { |
382 | spin_unlock(lock: &con->addrs_lock); |
383 | srcu_read_unlock(ssp: &connections_srcu, idx); |
384 | return -ENOENT; |
385 | } |
386 | |
387 | memcpy(&sas, &con->addr[con->curr_addr_index], |
388 | sizeof(struct sockaddr_storage)); |
389 | |
390 | if (try_new_addr) { |
391 | con->curr_addr_index++; |
392 | if (con->curr_addr_index == con->addr_count) |
393 | con->curr_addr_index = 0; |
394 | } |
395 | |
396 | *mark = con->mark; |
397 | spin_unlock(lock: &con->addrs_lock); |
398 | |
399 | if (sas_out) |
400 | memcpy(sas_out, &sas, sizeof(struct sockaddr_storage)); |
401 | |
402 | if (!sa_out) { |
403 | srcu_read_unlock(ssp: &connections_srcu, idx); |
404 | return 0; |
405 | } |
406 | |
407 | if (dlm_local_addr[0].ss_family == AF_INET) { |
408 | struct sockaddr_in *in4 = (struct sockaddr_in *) &sas; |
409 | struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out; |
410 | ret4->sin_addr.s_addr = in4->sin_addr.s_addr; |
411 | } else { |
412 | struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &sas; |
413 | struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out; |
414 | ret6->sin6_addr = in6->sin6_addr; |
415 | } |
416 | |
417 | srcu_read_unlock(ssp: &connections_srcu, idx); |
418 | return 0; |
419 | } |
420 | |
421 | static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid, |
422 | unsigned int *mark) |
423 | { |
424 | struct connection *con; |
425 | int i, idx, addr_i; |
426 | |
427 | idx = srcu_read_lock(ssp: &connections_srcu); |
428 | for (i = 0; i < CONN_HASH_SIZE; i++) { |
429 | hlist_for_each_entry_rcu(con, &connection_hash[i], list) { |
430 | WARN_ON_ONCE(!con->addr_count); |
431 | |
432 | spin_lock(lock: &con->addrs_lock); |
433 | for (addr_i = 0; addr_i < con->addr_count; addr_i++) { |
434 | if (addr_compare(x: &con->addr[addr_i], y: addr)) { |
435 | *nodeid = con->nodeid; |
436 | *mark = con->mark; |
437 | spin_unlock(lock: &con->addrs_lock); |
438 | srcu_read_unlock(ssp: &connections_srcu, idx); |
439 | return 0; |
440 | } |
441 | } |
442 | spin_unlock(lock: &con->addrs_lock); |
443 | } |
444 | } |
445 | srcu_read_unlock(ssp: &connections_srcu, idx); |
446 | |
447 | return -ENOENT; |
448 | } |
449 | |
450 | static bool dlm_lowcomms_con_has_addr(const struct connection *con, |
451 | const struct sockaddr_storage *addr) |
452 | { |
453 | int i; |
454 | |
455 | for (i = 0; i < con->addr_count; i++) { |
456 | if (addr_compare(x: &con->addr[i], y: addr)) |
457 | return true; |
458 | } |
459 | |
460 | return false; |
461 | } |
462 | |
463 | int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len) |
464 | { |
465 | struct connection *con; |
466 | bool ret, idx; |
467 | |
468 | idx = srcu_read_lock(ssp: &connections_srcu); |
469 | con = nodeid2con(nodeid, GFP_NOFS); |
470 | if (!con) { |
471 | srcu_read_unlock(ssp: &connections_srcu, idx); |
472 | return -ENOMEM; |
473 | } |
474 | |
475 | spin_lock(lock: &con->addrs_lock); |
476 | if (!con->addr_count) { |
477 | memcpy(&con->addr[0], addr, sizeof(*addr)); |
478 | con->addr_count = 1; |
479 | con->mark = dlm_config.ci_mark; |
480 | spin_unlock(lock: &con->addrs_lock); |
481 | srcu_read_unlock(ssp: &connections_srcu, idx); |
482 | return 0; |
483 | } |
484 | |
485 | ret = dlm_lowcomms_con_has_addr(con, addr); |
486 | if (ret) { |
487 | spin_unlock(lock: &con->addrs_lock); |
488 | srcu_read_unlock(ssp: &connections_srcu, idx); |
489 | return -EEXIST; |
490 | } |
491 | |
492 | if (con->addr_count >= DLM_MAX_ADDR_COUNT) { |
493 | spin_unlock(lock: &con->addrs_lock); |
494 | srcu_read_unlock(ssp: &connections_srcu, idx); |
495 | return -ENOSPC; |
496 | } |
497 | |
498 | memcpy(&con->addr[con->addr_count++], addr, sizeof(*addr)); |
499 | srcu_read_unlock(ssp: &connections_srcu, idx); |
500 | spin_unlock(lock: &con->addrs_lock); |
501 | return 0; |
502 | } |
503 | |
504 | /* Data available on socket or listen socket received a connect */ |
505 | static void lowcomms_data_ready(struct sock *sk) |
506 | { |
507 | struct connection *con = sock2con(sk); |
508 | |
509 | trace_sk_data_ready(sk); |
510 | |
511 | set_bit(CF_RECV_INTR, addr: &con->flags); |
512 | lowcomms_queue_rwork(con); |
513 | } |
514 | |
515 | static void lowcomms_write_space(struct sock *sk) |
516 | { |
517 | struct connection *con = sock2con(sk); |
518 | |
519 | clear_bit(SOCK_NOSPACE, addr: &con->sock->flags); |
520 | |
521 | spin_lock_bh(lock: &con->writequeue_lock); |
522 | if (test_and_clear_bit(CF_APP_LIMITED, addr: &con->flags)) { |
523 | con->sock->sk->sk_write_pending--; |
524 | clear_bit(SOCKWQ_ASYNC_NOSPACE, addr: &con->sock->flags); |
525 | } |
526 | |
527 | lowcomms_queue_swork(con); |
528 | spin_unlock_bh(lock: &con->writequeue_lock); |
529 | } |
530 | |
531 | static void lowcomms_state_change(struct sock *sk) |
532 | { |
533 | /* SCTP layer is not calling sk_data_ready when the connection |
534 | * is done, so we catch the signal through here. |
535 | */ |
536 | if (sk->sk_shutdown == RCV_SHUTDOWN) |
537 | lowcomms_data_ready(sk); |
538 | } |
539 | |
540 | static void lowcomms_listen_data_ready(struct sock *sk) |
541 | { |
542 | trace_sk_data_ready(sk); |
543 | |
544 | queue_work(wq: io_workqueue, work: &listen_con.rwork); |
545 | } |
546 | |
547 | int dlm_lowcomms_connect_node(int nodeid) |
548 | { |
549 | struct connection *con; |
550 | int idx; |
551 | |
552 | idx = srcu_read_lock(ssp: &connections_srcu); |
553 | con = nodeid2con(nodeid, alloc: 0); |
554 | if (WARN_ON_ONCE(!con)) { |
555 | srcu_read_unlock(ssp: &connections_srcu, idx); |
556 | return -ENOENT; |
557 | } |
558 | |
559 | down_read(sem: &con->sock_lock); |
560 | if (!con->sock) { |
561 | spin_lock_bh(lock: &con->writequeue_lock); |
562 | lowcomms_queue_swork(con); |
563 | spin_unlock_bh(lock: &con->writequeue_lock); |
564 | } |
565 | up_read(sem: &con->sock_lock); |
566 | srcu_read_unlock(ssp: &connections_srcu, idx); |
567 | |
568 | cond_resched(); |
569 | return 0; |
570 | } |
571 | |
572 | int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark) |
573 | { |
574 | struct connection *con; |
575 | int idx; |
576 | |
577 | idx = srcu_read_lock(ssp: &connections_srcu); |
578 | con = nodeid2con(nodeid, alloc: 0); |
579 | if (!con) { |
580 | srcu_read_unlock(ssp: &connections_srcu, idx); |
581 | return -ENOENT; |
582 | } |
583 | |
584 | spin_lock(lock: &con->addrs_lock); |
585 | con->mark = mark; |
586 | spin_unlock(lock: &con->addrs_lock); |
587 | srcu_read_unlock(ssp: &connections_srcu, idx); |
588 | return 0; |
589 | } |
590 | |
591 | static void lowcomms_error_report(struct sock *sk) |
592 | { |
593 | struct connection *con = sock2con(sk); |
594 | struct inet_sock *inet; |
595 | |
596 | inet = inet_sk(sk); |
597 | switch (sk->sk_family) { |
598 | case AF_INET: |
599 | printk_ratelimited(KERN_ERR "dlm: node %d: socket error " |
600 | "sending to node %d at %pI4, dport %d, " |
601 | "sk_err=%d/%d\n" , dlm_our_nodeid(), |
602 | con->nodeid, &inet->inet_daddr, |
603 | ntohs(inet->inet_dport), sk->sk_err, |
604 | READ_ONCE(sk->sk_err_soft)); |
605 | break; |
606 | #if IS_ENABLED(CONFIG_IPV6) |
607 | case AF_INET6: |
608 | printk_ratelimited(KERN_ERR "dlm: node %d: socket error " |
609 | "sending to node %d at %pI6c, " |
610 | "dport %d, sk_err=%d/%d\n" , dlm_our_nodeid(), |
611 | con->nodeid, &sk->sk_v6_daddr, |
612 | ntohs(inet->inet_dport), sk->sk_err, |
613 | READ_ONCE(sk->sk_err_soft)); |
614 | break; |
615 | #endif |
616 | default: |
617 | printk_ratelimited(KERN_ERR "dlm: node %d: socket error " |
618 | "invalid socket family %d set, " |
619 | "sk_err=%d/%d\n" , dlm_our_nodeid(), |
620 | sk->sk_family, sk->sk_err, |
621 | READ_ONCE(sk->sk_err_soft)); |
622 | break; |
623 | } |
624 | |
625 | dlm_midcomms_unack_msg_resend(nodeid: con->nodeid); |
626 | |
627 | listen_sock.sk_error_report(sk); |
628 | } |
629 | |
630 | static void restore_callbacks(struct sock *sk) |
631 | { |
632 | #ifdef CONFIG_LOCKDEP |
633 | WARN_ON_ONCE(!lockdep_sock_is_held(sk)); |
634 | #endif |
635 | |
636 | sk->sk_user_data = NULL; |
637 | sk->sk_data_ready = listen_sock.sk_data_ready; |
638 | sk->sk_state_change = listen_sock.sk_state_change; |
639 | sk->sk_write_space = listen_sock.sk_write_space; |
640 | sk->sk_error_report = listen_sock.sk_error_report; |
641 | } |
642 | |
643 | /* Make a socket active */ |
644 | static void add_sock(struct socket *sock, struct connection *con) |
645 | { |
646 | struct sock *sk = sock->sk; |
647 | |
648 | lock_sock(sk); |
649 | con->sock = sock; |
650 | |
651 | sk->sk_user_data = con; |
652 | sk->sk_data_ready = lowcomms_data_ready; |
653 | sk->sk_write_space = lowcomms_write_space; |
654 | if (dlm_config.ci_protocol == DLM_PROTO_SCTP) |
655 | sk->sk_state_change = lowcomms_state_change; |
656 | sk->sk_allocation = GFP_NOFS; |
657 | sk->sk_use_task_frag = false; |
658 | sk->sk_error_report = lowcomms_error_report; |
659 | release_sock(sk); |
660 | } |
661 | |
662 | /* Add the port number to an IPv6 or 4 sockaddr and return the address |
663 | length */ |
664 | static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, |
665 | int *addr_len) |
666 | { |
667 | saddr->ss_family = dlm_local_addr[0].ss_family; |
668 | if (saddr->ss_family == AF_INET) { |
669 | struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; |
670 | in4_addr->sin_port = cpu_to_be16(port); |
671 | *addr_len = sizeof(struct sockaddr_in); |
672 | memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); |
673 | } else { |
674 | struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; |
675 | in6_addr->sin6_port = cpu_to_be16(port); |
676 | *addr_len = sizeof(struct sockaddr_in6); |
677 | } |
678 | memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len); |
679 | } |
680 | |
681 | static void dlm_page_release(struct kref *kref) |
682 | { |
683 | struct writequeue_entry *e = container_of(kref, struct writequeue_entry, |
684 | ref); |
685 | |
686 | __free_page(e->page); |
687 | dlm_free_writequeue(writequeue: e); |
688 | } |
689 | |
690 | static void dlm_msg_release(struct kref *kref) |
691 | { |
692 | struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref); |
693 | |
694 | kref_put(kref: &msg->entry->ref, release: dlm_page_release); |
695 | dlm_free_msg(msg); |
696 | } |
697 | |
698 | static void free_entry(struct writequeue_entry *e) |
699 | { |
700 | struct dlm_msg *msg, *tmp; |
701 | |
702 | list_for_each_entry_safe(msg, tmp, &e->msgs, list) { |
703 | if (msg->orig_msg) { |
704 | msg->orig_msg->retransmit = false; |
705 | kref_put(kref: &msg->orig_msg->ref, release: dlm_msg_release); |
706 | } |
707 | |
708 | list_del(entry: &msg->list); |
709 | kref_put(kref: &msg->ref, release: dlm_msg_release); |
710 | } |
711 | |
712 | list_del(entry: &e->list); |
713 | kref_put(kref: &e->ref, release: dlm_page_release); |
714 | } |
715 | |
716 | static void dlm_close_sock(struct socket **sock) |
717 | { |
718 | lock_sock(sk: (*sock)->sk); |
719 | restore_callbacks(sk: (*sock)->sk); |
720 | release_sock(sk: (*sock)->sk); |
721 | |
722 | sock_release(sock: *sock); |
723 | *sock = NULL; |
724 | } |
725 | |
726 | static void allow_connection_io(struct connection *con) |
727 | { |
728 | if (con->othercon) |
729 | clear_bit(CF_IO_STOP, addr: &con->othercon->flags); |
730 | clear_bit(CF_IO_STOP, addr: &con->flags); |
731 | } |
732 | |
733 | static void stop_connection_io(struct connection *con) |
734 | { |
735 | if (con->othercon) |
736 | stop_connection_io(con: con->othercon); |
737 | |
738 | spin_lock_bh(lock: &con->writequeue_lock); |
739 | set_bit(CF_IO_STOP, addr: &con->flags); |
740 | spin_unlock_bh(lock: &con->writequeue_lock); |
741 | |
742 | down_write(sem: &con->sock_lock); |
743 | if (con->sock) { |
744 | lock_sock(sk: con->sock->sk); |
745 | restore_callbacks(sk: con->sock->sk); |
746 | release_sock(sk: con->sock->sk); |
747 | } |
748 | up_write(sem: &con->sock_lock); |
749 | |
750 | cancel_work_sync(work: &con->swork); |
751 | cancel_work_sync(work: &con->rwork); |
752 | } |
753 | |
754 | /* Close a remote connection and tidy up */ |
755 | static void close_connection(struct connection *con, bool and_other) |
756 | { |
757 | struct writequeue_entry *e; |
758 | |
759 | if (con->othercon && and_other) |
760 | close_connection(con: con->othercon, and_other: false); |
761 | |
762 | down_write(sem: &con->sock_lock); |
763 | if (!con->sock) { |
764 | up_write(sem: &con->sock_lock); |
765 | return; |
766 | } |
767 | |
768 | dlm_close_sock(sock: &con->sock); |
769 | |
770 | /* if we send a writequeue entry only a half way, we drop the |
771 | * whole entry because reconnection and that we not start of the |
772 | * middle of a msg which will confuse the other end. |
773 | * |
774 | * we can always drop messages because retransmits, but what we |
775 | * cannot allow is to transmit half messages which may be processed |
776 | * at the other side. |
777 | * |
778 | * our policy is to start on a clean state when disconnects, we don't |
779 | * know what's send/received on transport layer in this case. |
780 | */ |
781 | spin_lock_bh(lock: &con->writequeue_lock); |
782 | if (!list_empty(head: &con->writequeue)) { |
783 | e = list_first_entry(&con->writequeue, struct writequeue_entry, |
784 | list); |
785 | if (e->dirty) |
786 | free_entry(e); |
787 | } |
788 | spin_unlock_bh(lock: &con->writequeue_lock); |
789 | |
790 | con->rx_leftover = 0; |
791 | con->retries = 0; |
792 | clear_bit(CF_APP_LIMITED, addr: &con->flags); |
793 | clear_bit(CF_RECV_PENDING, addr: &con->flags); |
794 | clear_bit(CF_SEND_PENDING, addr: &con->flags); |
795 | up_write(sem: &con->sock_lock); |
796 | } |
797 | |
798 | static void shutdown_connection(struct connection *con, bool and_other) |
799 | { |
800 | int ret; |
801 | |
802 | if (con->othercon && and_other) |
803 | shutdown_connection(con: con->othercon, and_other: false); |
804 | |
805 | flush_workqueue(io_workqueue); |
806 | down_read(sem: &con->sock_lock); |
807 | /* nothing to shutdown */ |
808 | if (!con->sock) { |
809 | up_read(sem: &con->sock_lock); |
810 | return; |
811 | } |
812 | |
813 | ret = kernel_sock_shutdown(sock: con->sock, how: SHUT_WR); |
814 | up_read(sem: &con->sock_lock); |
815 | if (ret) { |
816 | log_print("Connection %p failed to shutdown: %d will force close" , |
817 | con, ret); |
818 | goto force_close; |
819 | } else { |
820 | ret = wait_event_timeout(con->shutdown_wait, !con->sock, |
821 | DLM_SHUTDOWN_WAIT_TIMEOUT); |
822 | if (ret == 0) { |
823 | log_print("Connection %p shutdown timed out, will force close" , |
824 | con); |
825 | goto force_close; |
826 | } |
827 | } |
828 | |
829 | return; |
830 | |
831 | force_close: |
832 | close_connection(con, and_other: false); |
833 | } |
834 | |
835 | static struct processqueue_entry *new_processqueue_entry(int nodeid, |
836 | int buflen) |
837 | { |
838 | struct processqueue_entry *pentry; |
839 | |
840 | pentry = kmalloc(size: sizeof(*pentry), GFP_NOFS); |
841 | if (!pentry) |
842 | return NULL; |
843 | |
844 | pentry->buf = kmalloc(size: buflen, GFP_NOFS); |
845 | if (!pentry->buf) { |
846 | kfree(objp: pentry); |
847 | return NULL; |
848 | } |
849 | |
850 | pentry->nodeid = nodeid; |
851 | return pentry; |
852 | } |
853 | |
854 | static void free_processqueue_entry(struct processqueue_entry *pentry) |
855 | { |
856 | kfree(objp: pentry->buf); |
857 | kfree(objp: pentry); |
858 | } |
859 | |
860 | struct dlm_processed_nodes { |
861 | int nodeid; |
862 | |
863 | struct list_head list; |
864 | }; |
865 | |
866 | static void process_dlm_messages(struct work_struct *work) |
867 | { |
868 | struct processqueue_entry *pentry; |
869 | |
870 | spin_lock(lock: &processqueue_lock); |
871 | pentry = list_first_entry_or_null(&processqueue, |
872 | struct processqueue_entry, list); |
873 | if (WARN_ON_ONCE(!pentry)) { |
874 | process_dlm_messages_pending = false; |
875 | spin_unlock(lock: &processqueue_lock); |
876 | return; |
877 | } |
878 | |
879 | list_del(entry: &pentry->list); |
880 | atomic_dec(v: &processqueue_count); |
881 | spin_unlock(lock: &processqueue_lock); |
882 | |
883 | for (;;) { |
884 | dlm_process_incoming_buffer(nodeid: pentry->nodeid, buf: pentry->buf, |
885 | buflen: pentry->buflen); |
886 | free_processqueue_entry(pentry); |
887 | |
888 | spin_lock(lock: &processqueue_lock); |
889 | pentry = list_first_entry_or_null(&processqueue, |
890 | struct processqueue_entry, list); |
891 | if (!pentry) { |
892 | process_dlm_messages_pending = false; |
893 | spin_unlock(lock: &processqueue_lock); |
894 | break; |
895 | } |
896 | |
897 | list_del(entry: &pentry->list); |
898 | atomic_dec(v: &processqueue_count); |
899 | spin_unlock(lock: &processqueue_lock); |
900 | } |
901 | } |
902 | |
903 | /* Data received from remote end */ |
904 | static int receive_from_sock(struct connection *con, int buflen) |
905 | { |
906 | struct processqueue_entry *pentry; |
907 | int ret, buflen_real; |
908 | struct msghdr msg; |
909 | struct kvec iov; |
910 | |
911 | pentry = new_processqueue_entry(nodeid: con->nodeid, buflen); |
912 | if (!pentry) |
913 | return DLM_IO_RESCHED; |
914 | |
915 | memcpy(pentry->buf, con->rx_leftover_buf, con->rx_leftover); |
916 | |
917 | /* calculate new buffer parameter regarding last receive and |
918 | * possible leftover bytes |
919 | */ |
920 | iov.iov_base = pentry->buf + con->rx_leftover; |
921 | iov.iov_len = buflen - con->rx_leftover; |
922 | |
923 | memset(&msg, 0, sizeof(msg)); |
924 | msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; |
925 | clear_bit(CF_RECV_INTR, addr: &con->flags); |
926 | again: |
927 | ret = kernel_recvmsg(sock: con->sock, msg: &msg, vec: &iov, num: 1, len: iov.iov_len, |
928 | flags: msg.msg_flags); |
929 | trace_dlm_recv(nodeid: con->nodeid, ret); |
930 | if (ret == -EAGAIN) { |
931 | lock_sock(sk: con->sock->sk); |
932 | if (test_and_clear_bit(CF_RECV_INTR, addr: &con->flags)) { |
933 | release_sock(sk: con->sock->sk); |
934 | goto again; |
935 | } |
936 | |
937 | clear_bit(CF_RECV_PENDING, addr: &con->flags); |
938 | release_sock(sk: con->sock->sk); |
939 | free_processqueue_entry(pentry); |
940 | return DLM_IO_END; |
941 | } else if (ret == 0) { |
942 | /* close will clear CF_RECV_PENDING */ |
943 | free_processqueue_entry(pentry); |
944 | return DLM_IO_EOF; |
945 | } else if (ret < 0) { |
946 | free_processqueue_entry(pentry); |
947 | return ret; |
948 | } |
949 | |
950 | /* new buflen according readed bytes and leftover from last receive */ |
951 | buflen_real = ret + con->rx_leftover; |
952 | ret = dlm_validate_incoming_buffer(nodeid: con->nodeid, buf: pentry->buf, |
953 | len: buflen_real); |
954 | if (ret < 0) { |
955 | free_processqueue_entry(pentry); |
956 | return ret; |
957 | } |
958 | |
959 | pentry->buflen = ret; |
960 | |
961 | /* calculate leftover bytes from process and put it into begin of |
962 | * the receive buffer, so next receive we have the full message |
963 | * at the start address of the receive buffer. |
964 | */ |
965 | con->rx_leftover = buflen_real - ret; |
966 | memmove(con->rx_leftover_buf, pentry->buf + ret, |
967 | con->rx_leftover); |
968 | |
969 | spin_lock(lock: &processqueue_lock); |
970 | ret = atomic_inc_return(v: &processqueue_count); |
971 | list_add_tail(new: &pentry->list, head: &processqueue); |
972 | if (!process_dlm_messages_pending) { |
973 | process_dlm_messages_pending = true; |
974 | queue_work(wq: process_workqueue, work: &process_work); |
975 | } |
976 | spin_unlock(lock: &processqueue_lock); |
977 | |
978 | if (ret > DLM_MAX_PROCESS_BUFFERS) |
979 | return DLM_IO_FLUSH; |
980 | |
981 | return DLM_IO_SUCCESS; |
982 | } |
983 | |
984 | /* Listening socket is busy, accept a connection */ |
985 | static int accept_from_sock(void) |
986 | { |
987 | struct sockaddr_storage peeraddr; |
988 | int len, idx, result, nodeid; |
989 | struct connection *newcon; |
990 | struct socket *newsock; |
991 | unsigned int mark; |
992 | |
993 | result = kernel_accept(sock: listen_con.sock, newsock: &newsock, O_NONBLOCK); |
994 | if (result == -EAGAIN) |
995 | return DLM_IO_END; |
996 | else if (result < 0) |
997 | goto accept_err; |
998 | |
999 | /* Get the connected socket's peer */ |
1000 | memset(&peeraddr, 0, sizeof(peeraddr)); |
1001 | len = newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 2); |
1002 | if (len < 0) { |
1003 | result = -ECONNABORTED; |
1004 | goto accept_err; |
1005 | } |
1006 | |
1007 | /* Get the new node's NODEID */ |
1008 | make_sockaddr(saddr: &peeraddr, port: 0, addr_len: &len); |
1009 | if (addr_to_nodeid(addr: &peeraddr, nodeid: &nodeid, mark: &mark)) { |
1010 | switch (peeraddr.ss_family) { |
1011 | case AF_INET: { |
1012 | struct sockaddr_in *sin = (struct sockaddr_in *)&peeraddr; |
1013 | |
1014 | log_print("connect from non cluster IPv4 node %pI4" , |
1015 | &sin->sin_addr); |
1016 | break; |
1017 | } |
1018 | #if IS_ENABLED(CONFIG_IPV6) |
1019 | case AF_INET6: { |
1020 | struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&peeraddr; |
1021 | |
1022 | log_print("connect from non cluster IPv6 node %pI6c" , |
1023 | &sin6->sin6_addr); |
1024 | break; |
1025 | } |
1026 | #endif |
1027 | default: |
1028 | log_print("invalid family from non cluster node" ); |
1029 | break; |
1030 | } |
1031 | |
1032 | sock_release(sock: newsock); |
1033 | return -1; |
1034 | } |
1035 | |
1036 | log_print("got connection from %d" , nodeid); |
1037 | |
1038 | /* Check to see if we already have a connection to this node. This |
1039 | * could happen if the two nodes initiate a connection at roughly |
1040 | * the same time and the connections cross on the wire. |
1041 | * In this case we store the incoming one in "othercon" |
1042 | */ |
1043 | idx = srcu_read_lock(ssp: &connections_srcu); |
1044 | newcon = nodeid2con(nodeid, alloc: 0); |
1045 | if (WARN_ON_ONCE(!newcon)) { |
1046 | srcu_read_unlock(ssp: &connections_srcu, idx); |
1047 | result = -ENOENT; |
1048 | goto accept_err; |
1049 | } |
1050 | |
1051 | sock_set_mark(sk: newsock->sk, val: mark); |
1052 | |
1053 | down_write(sem: &newcon->sock_lock); |
1054 | if (newcon->sock) { |
1055 | struct connection *othercon = newcon->othercon; |
1056 | |
1057 | if (!othercon) { |
1058 | othercon = kzalloc(size: sizeof(*othercon), GFP_NOFS); |
1059 | if (!othercon) { |
1060 | log_print("failed to allocate incoming socket" ); |
1061 | up_write(sem: &newcon->sock_lock); |
1062 | srcu_read_unlock(ssp: &connections_srcu, idx); |
1063 | result = -ENOMEM; |
1064 | goto accept_err; |
1065 | } |
1066 | |
1067 | dlm_con_init(con: othercon, nodeid); |
1068 | lockdep_set_subclass(&othercon->sock_lock, 1); |
1069 | newcon->othercon = othercon; |
1070 | set_bit(CF_IS_OTHERCON, addr: &othercon->flags); |
1071 | } else { |
1072 | /* close other sock con if we have something new */ |
1073 | close_connection(con: othercon, and_other: false); |
1074 | } |
1075 | |
1076 | down_write(sem: &othercon->sock_lock); |
1077 | add_sock(sock: newsock, con: othercon); |
1078 | |
1079 | /* check if we receved something while adding */ |
1080 | lock_sock(sk: othercon->sock->sk); |
1081 | lowcomms_queue_rwork(con: othercon); |
1082 | release_sock(sk: othercon->sock->sk); |
1083 | up_write(sem: &othercon->sock_lock); |
1084 | } |
1085 | else { |
1086 | /* accept copies the sk after we've saved the callbacks, so we |
1087 | don't want to save them a second time or comm errors will |
1088 | result in calling sk_error_report recursively. */ |
1089 | add_sock(sock: newsock, con: newcon); |
1090 | |
1091 | /* check if we receved something while adding */ |
1092 | lock_sock(sk: newcon->sock->sk); |
1093 | lowcomms_queue_rwork(con: newcon); |
1094 | release_sock(sk: newcon->sock->sk); |
1095 | } |
1096 | up_write(sem: &newcon->sock_lock); |
1097 | srcu_read_unlock(ssp: &connections_srcu, idx); |
1098 | |
1099 | return DLM_IO_SUCCESS; |
1100 | |
1101 | accept_err: |
1102 | if (newsock) |
1103 | sock_release(sock: newsock); |
1104 | |
1105 | return result; |
1106 | } |
1107 | |
1108 | /* |
1109 | * writequeue_entry_complete - try to delete and free write queue entry |
1110 | * @e: write queue entry to try to delete |
1111 | * @completed: bytes completed |
1112 | * |
1113 | * writequeue_lock must be held. |
1114 | */ |
1115 | static void writequeue_entry_complete(struct writequeue_entry *e, int completed) |
1116 | { |
1117 | e->offset += completed; |
1118 | e->len -= completed; |
1119 | /* signal that page was half way transmitted */ |
1120 | e->dirty = true; |
1121 | |
1122 | if (e->len == 0 && e->users == 0) |
1123 | free_entry(e); |
1124 | } |
1125 | |
1126 | /* |
1127 | * sctp_bind_addrs - bind a SCTP socket to all our addresses |
1128 | */ |
1129 | static int sctp_bind_addrs(struct socket *sock, uint16_t port) |
1130 | { |
1131 | struct sockaddr_storage localaddr; |
1132 | struct sockaddr *addr = (struct sockaddr *)&localaddr; |
1133 | int i, addr_len, result = 0; |
1134 | |
1135 | for (i = 0; i < dlm_local_count; i++) { |
1136 | memcpy(&localaddr, &dlm_local_addr[i], sizeof(localaddr)); |
1137 | make_sockaddr(saddr: &localaddr, port, addr_len: &addr_len); |
1138 | |
1139 | if (!i) |
1140 | result = kernel_bind(sock, addr, addrlen: addr_len); |
1141 | else |
1142 | result = sock_bind_add(sk: sock->sk, addr, addr_len); |
1143 | |
1144 | if (result < 0) { |
1145 | log_print("Can't bind to %d addr number %d, %d.\n" , |
1146 | port, i + 1, result); |
1147 | break; |
1148 | } |
1149 | } |
1150 | return result; |
1151 | } |
1152 | |
1153 | /* Get local addresses */ |
1154 | static void init_local(void) |
1155 | { |
1156 | struct sockaddr_storage sas; |
1157 | int i; |
1158 | |
1159 | dlm_local_count = 0; |
1160 | for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) { |
1161 | if (dlm_our_addr(addr: &sas, num: i)) |
1162 | break; |
1163 | |
1164 | memcpy(&dlm_local_addr[dlm_local_count++], &sas, sizeof(sas)); |
1165 | } |
1166 | } |
1167 | |
1168 | static struct writequeue_entry *new_writequeue_entry(struct connection *con) |
1169 | { |
1170 | struct writequeue_entry *entry; |
1171 | |
1172 | entry = dlm_allocate_writequeue(); |
1173 | if (!entry) |
1174 | return NULL; |
1175 | |
1176 | entry->page = alloc_page(GFP_ATOMIC | __GFP_ZERO); |
1177 | if (!entry->page) { |
1178 | dlm_free_writequeue(writequeue: entry); |
1179 | return NULL; |
1180 | } |
1181 | |
1182 | entry->offset = 0; |
1183 | entry->len = 0; |
1184 | entry->end = 0; |
1185 | entry->dirty = false; |
1186 | entry->con = con; |
1187 | entry->users = 1; |
1188 | kref_init(kref: &entry->ref); |
1189 | return entry; |
1190 | } |
1191 | |
1192 | static struct writequeue_entry *new_wq_entry(struct connection *con, int len, |
1193 | char **ppc, void (*cb)(void *data), |
1194 | void *data) |
1195 | { |
1196 | struct writequeue_entry *e; |
1197 | |
1198 | spin_lock_bh(lock: &con->writequeue_lock); |
1199 | if (!list_empty(head: &con->writequeue)) { |
1200 | e = list_last_entry(&con->writequeue, struct writequeue_entry, list); |
1201 | if (DLM_WQ_REMAIN_BYTES(e) >= len) { |
1202 | kref_get(kref: &e->ref); |
1203 | |
1204 | *ppc = page_address(e->page) + e->end; |
1205 | if (cb) |
1206 | cb(data); |
1207 | |
1208 | e->end += len; |
1209 | e->users++; |
1210 | goto out; |
1211 | } |
1212 | } |
1213 | |
1214 | e = new_writequeue_entry(con); |
1215 | if (!e) |
1216 | goto out; |
1217 | |
1218 | kref_get(kref: &e->ref); |
1219 | *ppc = page_address(e->page); |
1220 | e->end += len; |
1221 | if (cb) |
1222 | cb(data); |
1223 | |
1224 | list_add_tail(new: &e->list, head: &con->writequeue); |
1225 | |
1226 | out: |
1227 | spin_unlock_bh(lock: &con->writequeue_lock); |
1228 | return e; |
1229 | }; |
1230 | |
1231 | static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len, |
1232 | gfp_t allocation, char **ppc, |
1233 | void (*cb)(void *data), |
1234 | void *data) |
1235 | { |
1236 | struct writequeue_entry *e; |
1237 | struct dlm_msg *msg; |
1238 | |
1239 | msg = dlm_allocate_msg(allocation); |
1240 | if (!msg) |
1241 | return NULL; |
1242 | |
1243 | kref_init(kref: &msg->ref); |
1244 | |
1245 | e = new_wq_entry(con, len, ppc, cb, data); |
1246 | if (!e) { |
1247 | dlm_free_msg(msg); |
1248 | return NULL; |
1249 | } |
1250 | |
1251 | msg->retransmit = false; |
1252 | msg->orig_msg = NULL; |
1253 | msg->ppc = *ppc; |
1254 | msg->len = len; |
1255 | msg->entry = e; |
1256 | |
1257 | return msg; |
1258 | } |
1259 | |
1260 | /* avoid false positive for nodes_srcu, unlock happens in |
1261 | * dlm_lowcomms_commit_msg which is a must call if success |
1262 | */ |
1263 | #ifndef __CHECKER__ |
1264 | struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation, |
1265 | char **ppc, void (*cb)(void *data), |
1266 | void *data) |
1267 | { |
1268 | struct connection *con; |
1269 | struct dlm_msg *msg; |
1270 | int idx; |
1271 | |
1272 | if (len > DLM_MAX_SOCKET_BUFSIZE || |
1273 | len < sizeof(struct dlm_header)) { |
1274 | BUILD_BUG_ON(PAGE_SIZE < DLM_MAX_SOCKET_BUFSIZE); |
1275 | log_print("failed to allocate a buffer of size %d" , len); |
1276 | WARN_ON_ONCE(1); |
1277 | return NULL; |
1278 | } |
1279 | |
1280 | idx = srcu_read_lock(ssp: &connections_srcu); |
1281 | con = nodeid2con(nodeid, alloc: 0); |
1282 | if (WARN_ON_ONCE(!con)) { |
1283 | srcu_read_unlock(ssp: &connections_srcu, idx); |
1284 | return NULL; |
1285 | } |
1286 | |
1287 | msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, data); |
1288 | if (!msg) { |
1289 | srcu_read_unlock(ssp: &connections_srcu, idx); |
1290 | return NULL; |
1291 | } |
1292 | |
1293 | /* for dlm_lowcomms_commit_msg() */ |
1294 | kref_get(kref: &msg->ref); |
1295 | /* we assume if successful commit must called */ |
1296 | msg->idx = idx; |
1297 | return msg; |
1298 | } |
1299 | #endif |
1300 | |
1301 | static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg) |
1302 | { |
1303 | struct writequeue_entry *e = msg->entry; |
1304 | struct connection *con = e->con; |
1305 | int users; |
1306 | |
1307 | spin_lock_bh(lock: &con->writequeue_lock); |
1308 | kref_get(kref: &msg->ref); |
1309 | list_add(new: &msg->list, head: &e->msgs); |
1310 | |
1311 | users = --e->users; |
1312 | if (users) |
1313 | goto out; |
1314 | |
1315 | e->len = DLM_WQ_LENGTH_BYTES(e); |
1316 | |
1317 | lowcomms_queue_swork(con); |
1318 | |
1319 | out: |
1320 | spin_unlock_bh(lock: &con->writequeue_lock); |
1321 | return; |
1322 | } |
1323 | |
1324 | /* avoid false positive for nodes_srcu, lock was happen in |
1325 | * dlm_lowcomms_new_msg |
1326 | */ |
1327 | #ifndef __CHECKER__ |
1328 | void dlm_lowcomms_commit_msg(struct dlm_msg *msg) |
1329 | { |
1330 | _dlm_lowcomms_commit_msg(msg); |
1331 | srcu_read_unlock(ssp: &connections_srcu, idx: msg->idx); |
1332 | /* because dlm_lowcomms_new_msg() */ |
1333 | kref_put(kref: &msg->ref, release: dlm_msg_release); |
1334 | } |
1335 | #endif |
1336 | |
1337 | void dlm_lowcomms_put_msg(struct dlm_msg *msg) |
1338 | { |
1339 | kref_put(kref: &msg->ref, release: dlm_msg_release); |
1340 | } |
1341 | |
1342 | /* does not held connections_srcu, usage lowcomms_error_report only */ |
1343 | int dlm_lowcomms_resend_msg(struct dlm_msg *msg) |
1344 | { |
1345 | struct dlm_msg *msg_resend; |
1346 | char *ppc; |
1347 | |
1348 | if (msg->retransmit) |
1349 | return 1; |
1350 | |
1351 | msg_resend = dlm_lowcomms_new_msg_con(con: msg->entry->con, len: msg->len, |
1352 | GFP_ATOMIC, ppc: &ppc, NULL, NULL); |
1353 | if (!msg_resend) |
1354 | return -ENOMEM; |
1355 | |
1356 | msg->retransmit = true; |
1357 | kref_get(kref: &msg->ref); |
1358 | msg_resend->orig_msg = msg; |
1359 | |
1360 | memcpy(ppc, msg->ppc, msg->len); |
1361 | _dlm_lowcomms_commit_msg(msg: msg_resend); |
1362 | dlm_lowcomms_put_msg(msg: msg_resend); |
1363 | |
1364 | return 0; |
1365 | } |
1366 | |
1367 | /* Send a message */ |
1368 | static int send_to_sock(struct connection *con) |
1369 | { |
1370 | struct writequeue_entry *e; |
1371 | struct bio_vec bvec; |
1372 | struct msghdr msg = { |
1373 | .msg_flags = MSG_SPLICE_PAGES | MSG_DONTWAIT | MSG_NOSIGNAL, |
1374 | }; |
1375 | int len, offset, ret; |
1376 | |
1377 | spin_lock_bh(lock: &con->writequeue_lock); |
1378 | e = con_next_wq(con); |
1379 | if (!e) { |
1380 | clear_bit(CF_SEND_PENDING, addr: &con->flags); |
1381 | spin_unlock_bh(lock: &con->writequeue_lock); |
1382 | return DLM_IO_END; |
1383 | } |
1384 | |
1385 | len = e->len; |
1386 | offset = e->offset; |
1387 | WARN_ON_ONCE(len == 0 && e->users == 0); |
1388 | spin_unlock_bh(lock: &con->writequeue_lock); |
1389 | |
1390 | bvec_set_page(bv: &bvec, page: e->page, len, offset); |
1391 | iov_iter_bvec(i: &msg.msg_iter, ITER_SOURCE, bvec: &bvec, nr_segs: 1, count: len); |
1392 | ret = sock_sendmsg(sock: con->sock, msg: &msg); |
1393 | trace_dlm_send(nodeid: con->nodeid, ret); |
1394 | if (ret == -EAGAIN || ret == 0) { |
1395 | lock_sock(sk: con->sock->sk); |
1396 | spin_lock_bh(lock: &con->writequeue_lock); |
1397 | if (test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) && |
1398 | !test_and_set_bit(CF_APP_LIMITED, addr: &con->flags)) { |
1399 | /* Notify TCP that we're limited by the |
1400 | * application window size. |
1401 | */ |
1402 | set_bit(SOCK_NOSPACE, addr: &con->sock->sk->sk_socket->flags); |
1403 | con->sock->sk->sk_write_pending++; |
1404 | |
1405 | clear_bit(CF_SEND_PENDING, addr: &con->flags); |
1406 | spin_unlock_bh(lock: &con->writequeue_lock); |
1407 | release_sock(sk: con->sock->sk); |
1408 | |
1409 | /* wait for write_space() event */ |
1410 | return DLM_IO_END; |
1411 | } |
1412 | spin_unlock_bh(lock: &con->writequeue_lock); |
1413 | release_sock(sk: con->sock->sk); |
1414 | |
1415 | return DLM_IO_RESCHED; |
1416 | } else if (ret < 0) { |
1417 | return ret; |
1418 | } |
1419 | |
1420 | spin_lock_bh(lock: &con->writequeue_lock); |
1421 | writequeue_entry_complete(e, completed: ret); |
1422 | spin_unlock_bh(lock: &con->writequeue_lock); |
1423 | |
1424 | return DLM_IO_SUCCESS; |
1425 | } |
1426 | |
1427 | static void clean_one_writequeue(struct connection *con) |
1428 | { |
1429 | struct writequeue_entry *e, *safe; |
1430 | |
1431 | spin_lock_bh(lock: &con->writequeue_lock); |
1432 | list_for_each_entry_safe(e, safe, &con->writequeue, list) { |
1433 | free_entry(e); |
1434 | } |
1435 | spin_unlock_bh(lock: &con->writequeue_lock); |
1436 | } |
1437 | |
1438 | static void connection_release(struct rcu_head *rcu) |
1439 | { |
1440 | struct connection *con = container_of(rcu, struct connection, rcu); |
1441 | |
1442 | WARN_ON_ONCE(!list_empty(&con->writequeue)); |
1443 | WARN_ON_ONCE(con->sock); |
1444 | kfree(objp: con); |
1445 | } |
1446 | |
1447 | /* Called from recovery when it knows that a node has |
1448 | left the cluster */ |
1449 | int dlm_lowcomms_close(int nodeid) |
1450 | { |
1451 | struct connection *con; |
1452 | int idx; |
1453 | |
1454 | log_print("closing connection to node %d" , nodeid); |
1455 | |
1456 | idx = srcu_read_lock(ssp: &connections_srcu); |
1457 | con = nodeid2con(nodeid, alloc: 0); |
1458 | if (WARN_ON_ONCE(!con)) { |
1459 | srcu_read_unlock(ssp: &connections_srcu, idx); |
1460 | return -ENOENT; |
1461 | } |
1462 | |
1463 | stop_connection_io(con); |
1464 | log_print("io handling for node: %d stopped" , nodeid); |
1465 | close_connection(con, and_other: true); |
1466 | |
1467 | spin_lock(lock: &connections_lock); |
1468 | hlist_del_rcu(n: &con->list); |
1469 | spin_unlock(lock: &connections_lock); |
1470 | |
1471 | clean_one_writequeue(con); |
1472 | call_srcu(ssp: &connections_srcu, head: &con->rcu, func: connection_release); |
1473 | if (con->othercon) { |
1474 | clean_one_writequeue(con: con->othercon); |
1475 | call_srcu(ssp: &connections_srcu, head: &con->othercon->rcu, func: connection_release); |
1476 | } |
1477 | srcu_read_unlock(ssp: &connections_srcu, idx); |
1478 | |
1479 | /* for debugging we print when we are done to compare with other |
1480 | * messages in between. This function need to be correctly synchronized |
1481 | * with io handling |
1482 | */ |
1483 | log_print("closing connection to node %d done" , nodeid); |
1484 | |
1485 | return 0; |
1486 | } |
1487 | |
1488 | /* Receive worker function */ |
1489 | static void process_recv_sockets(struct work_struct *work) |
1490 | { |
1491 | struct connection *con = container_of(work, struct connection, rwork); |
1492 | int ret, buflen; |
1493 | |
1494 | down_read(sem: &con->sock_lock); |
1495 | if (!con->sock) { |
1496 | up_read(sem: &con->sock_lock); |
1497 | return; |
1498 | } |
1499 | |
1500 | buflen = READ_ONCE(dlm_config.ci_buffer_size); |
1501 | do { |
1502 | ret = receive_from_sock(con, buflen); |
1503 | } while (ret == DLM_IO_SUCCESS); |
1504 | up_read(sem: &con->sock_lock); |
1505 | |
1506 | switch (ret) { |
1507 | case DLM_IO_END: |
1508 | /* CF_RECV_PENDING cleared */ |
1509 | break; |
1510 | case DLM_IO_EOF: |
1511 | close_connection(con, and_other: false); |
1512 | wake_up(&con->shutdown_wait); |
1513 | /* CF_RECV_PENDING cleared */ |
1514 | break; |
1515 | case DLM_IO_FLUSH: |
1516 | flush_workqueue(process_workqueue); |
1517 | fallthrough; |
1518 | case DLM_IO_RESCHED: |
1519 | cond_resched(); |
1520 | queue_work(wq: io_workqueue, work: &con->rwork); |
1521 | /* CF_RECV_PENDING not cleared */ |
1522 | break; |
1523 | default: |
1524 | if (ret < 0) { |
1525 | if (test_bit(CF_IS_OTHERCON, &con->flags)) { |
1526 | close_connection(con, and_other: false); |
1527 | } else { |
1528 | spin_lock_bh(lock: &con->writequeue_lock); |
1529 | lowcomms_queue_swork(con); |
1530 | spin_unlock_bh(lock: &con->writequeue_lock); |
1531 | } |
1532 | |
1533 | /* CF_RECV_PENDING cleared for othercon |
1534 | * we trigger send queue if not already done |
1535 | * and process_send_sockets will handle it |
1536 | */ |
1537 | break; |
1538 | } |
1539 | |
1540 | WARN_ON_ONCE(1); |
1541 | break; |
1542 | } |
1543 | } |
1544 | |
1545 | static void process_listen_recv_socket(struct work_struct *work) |
1546 | { |
1547 | int ret; |
1548 | |
1549 | if (WARN_ON_ONCE(!listen_con.sock)) |
1550 | return; |
1551 | |
1552 | do { |
1553 | ret = accept_from_sock(); |
1554 | } while (ret == DLM_IO_SUCCESS); |
1555 | |
1556 | if (ret < 0) |
1557 | log_print("critical error accepting connection: %d" , ret); |
1558 | } |
1559 | |
1560 | static int dlm_connect(struct connection *con) |
1561 | { |
1562 | struct sockaddr_storage addr; |
1563 | int result, addr_len; |
1564 | struct socket *sock; |
1565 | unsigned int mark; |
1566 | |
1567 | memset(&addr, 0, sizeof(addr)); |
1568 | result = nodeid_to_addr(nodeid: con->nodeid, sas_out: &addr, NULL, |
1569 | try_new_addr: dlm_proto_ops->try_new_addr, mark: &mark); |
1570 | if (result < 0) { |
1571 | log_print("no address for nodeid %d" , con->nodeid); |
1572 | return result; |
1573 | } |
1574 | |
1575 | /* Create a socket to communicate with */ |
1576 | result = sock_create_kern(net: &init_net, family: dlm_local_addr[0].ss_family, |
1577 | type: SOCK_STREAM, proto: dlm_proto_ops->proto, res: &sock); |
1578 | if (result < 0) |
1579 | return result; |
1580 | |
1581 | sock_set_mark(sk: sock->sk, val: mark); |
1582 | dlm_proto_ops->sockopts(sock); |
1583 | |
1584 | result = dlm_proto_ops->bind(sock); |
1585 | if (result < 0) { |
1586 | sock_release(sock); |
1587 | return result; |
1588 | } |
1589 | |
1590 | add_sock(sock, con); |
1591 | |
1592 | log_print_ratelimited("connecting to %d" , con->nodeid); |
1593 | make_sockaddr(saddr: &addr, port: dlm_config.ci_tcp_port, addr_len: &addr_len); |
1594 | result = dlm_proto_ops->connect(con, sock, (struct sockaddr *)&addr, |
1595 | addr_len); |
1596 | switch (result) { |
1597 | case -EINPROGRESS: |
1598 | /* not an error */ |
1599 | fallthrough; |
1600 | case 0: |
1601 | break; |
1602 | default: |
1603 | if (result < 0) |
1604 | dlm_close_sock(sock: &con->sock); |
1605 | |
1606 | break; |
1607 | } |
1608 | |
1609 | return result; |
1610 | } |
1611 | |
1612 | /* Send worker function */ |
1613 | static void process_send_sockets(struct work_struct *work) |
1614 | { |
1615 | struct connection *con = container_of(work, struct connection, swork); |
1616 | int ret; |
1617 | |
1618 | WARN_ON_ONCE(test_bit(CF_IS_OTHERCON, &con->flags)); |
1619 | |
1620 | down_read(sem: &con->sock_lock); |
1621 | if (!con->sock) { |
1622 | up_read(sem: &con->sock_lock); |
1623 | down_write(sem: &con->sock_lock); |
1624 | if (!con->sock) { |
1625 | ret = dlm_connect(con); |
1626 | switch (ret) { |
1627 | case 0: |
1628 | break; |
1629 | case -EINPROGRESS: |
1630 | /* avoid spamming resched on connection |
1631 | * we might can switch to a state_change |
1632 | * event based mechanism if established |
1633 | */ |
1634 | msleep(msecs: 100); |
1635 | break; |
1636 | default: |
1637 | /* CF_SEND_PENDING not cleared */ |
1638 | up_write(sem: &con->sock_lock); |
1639 | log_print("connect to node %d try %d error %d" , |
1640 | con->nodeid, con->retries++, ret); |
1641 | msleep(msecs: 1000); |
1642 | /* For now we try forever to reconnect. In |
1643 | * future we should send a event to cluster |
1644 | * manager to fence itself after certain amount |
1645 | * of retries. |
1646 | */ |
1647 | queue_work(wq: io_workqueue, work: &con->swork); |
1648 | return; |
1649 | } |
1650 | } |
1651 | downgrade_write(sem: &con->sock_lock); |
1652 | } |
1653 | |
1654 | do { |
1655 | ret = send_to_sock(con); |
1656 | } while (ret == DLM_IO_SUCCESS); |
1657 | up_read(sem: &con->sock_lock); |
1658 | |
1659 | switch (ret) { |
1660 | case DLM_IO_END: |
1661 | /* CF_SEND_PENDING cleared */ |
1662 | break; |
1663 | case DLM_IO_RESCHED: |
1664 | /* CF_SEND_PENDING not cleared */ |
1665 | cond_resched(); |
1666 | queue_work(wq: io_workqueue, work: &con->swork); |
1667 | break; |
1668 | default: |
1669 | if (ret < 0) { |
1670 | close_connection(con, and_other: false); |
1671 | |
1672 | /* CF_SEND_PENDING cleared */ |
1673 | spin_lock_bh(lock: &con->writequeue_lock); |
1674 | lowcomms_queue_swork(con); |
1675 | spin_unlock_bh(lock: &con->writequeue_lock); |
1676 | break; |
1677 | } |
1678 | |
1679 | WARN_ON_ONCE(1); |
1680 | break; |
1681 | } |
1682 | } |
1683 | |
1684 | static void work_stop(void) |
1685 | { |
1686 | if (io_workqueue) { |
1687 | destroy_workqueue(wq: io_workqueue); |
1688 | io_workqueue = NULL; |
1689 | } |
1690 | |
1691 | if (process_workqueue) { |
1692 | destroy_workqueue(wq: process_workqueue); |
1693 | process_workqueue = NULL; |
1694 | } |
1695 | } |
1696 | |
1697 | static int work_start(void) |
1698 | { |
1699 | io_workqueue = alloc_workqueue(fmt: "dlm_io" , flags: WQ_HIGHPRI | WQ_MEM_RECLAIM | |
1700 | WQ_UNBOUND, max_active: 0); |
1701 | if (!io_workqueue) { |
1702 | log_print("can't start dlm_io" ); |
1703 | return -ENOMEM; |
1704 | } |
1705 | |
1706 | /* ordered dlm message process queue, |
1707 | * should be converted to a tasklet |
1708 | */ |
1709 | process_workqueue = alloc_ordered_workqueue("dlm_process" , |
1710 | WQ_HIGHPRI | WQ_MEM_RECLAIM); |
1711 | if (!process_workqueue) { |
1712 | log_print("can't start dlm_process" ); |
1713 | destroy_workqueue(wq: io_workqueue); |
1714 | io_workqueue = NULL; |
1715 | return -ENOMEM; |
1716 | } |
1717 | |
1718 | return 0; |
1719 | } |
1720 | |
1721 | void dlm_lowcomms_shutdown(void) |
1722 | { |
1723 | struct connection *con; |
1724 | int i, idx; |
1725 | |
1726 | /* stop lowcomms_listen_data_ready calls */ |
1727 | lock_sock(sk: listen_con.sock->sk); |
1728 | listen_con.sock->sk->sk_data_ready = listen_sock.sk_data_ready; |
1729 | release_sock(sk: listen_con.sock->sk); |
1730 | |
1731 | cancel_work_sync(work: &listen_con.rwork); |
1732 | dlm_close_sock(sock: &listen_con.sock); |
1733 | |
1734 | idx = srcu_read_lock(ssp: &connections_srcu); |
1735 | for (i = 0; i < CONN_HASH_SIZE; i++) { |
1736 | hlist_for_each_entry_rcu(con, &connection_hash[i], list) { |
1737 | shutdown_connection(con, and_other: true); |
1738 | stop_connection_io(con); |
1739 | flush_workqueue(process_workqueue); |
1740 | close_connection(con, and_other: true); |
1741 | |
1742 | clean_one_writequeue(con); |
1743 | if (con->othercon) |
1744 | clean_one_writequeue(con: con->othercon); |
1745 | allow_connection_io(con); |
1746 | } |
1747 | } |
1748 | srcu_read_unlock(ssp: &connections_srcu, idx); |
1749 | } |
1750 | |
1751 | void dlm_lowcomms_stop(void) |
1752 | { |
1753 | work_stop(); |
1754 | dlm_proto_ops = NULL; |
1755 | } |
1756 | |
1757 | static int dlm_listen_for_all(void) |
1758 | { |
1759 | struct socket *sock; |
1760 | int result; |
1761 | |
1762 | log_print("Using %s for communications" , |
1763 | dlm_proto_ops->name); |
1764 | |
1765 | result = dlm_proto_ops->listen_validate(); |
1766 | if (result < 0) |
1767 | return result; |
1768 | |
1769 | result = sock_create_kern(net: &init_net, family: dlm_local_addr[0].ss_family, |
1770 | type: SOCK_STREAM, proto: dlm_proto_ops->proto, res: &sock); |
1771 | if (result < 0) { |
1772 | log_print("Can't create comms socket: %d" , result); |
1773 | return result; |
1774 | } |
1775 | |
1776 | sock_set_mark(sk: sock->sk, val: dlm_config.ci_mark); |
1777 | dlm_proto_ops->listen_sockopts(sock); |
1778 | |
1779 | result = dlm_proto_ops->listen_bind(sock); |
1780 | if (result < 0) |
1781 | goto out; |
1782 | |
1783 | lock_sock(sk: sock->sk); |
1784 | listen_sock.sk_data_ready = sock->sk->sk_data_ready; |
1785 | listen_sock.sk_write_space = sock->sk->sk_write_space; |
1786 | listen_sock.sk_error_report = sock->sk->sk_error_report; |
1787 | listen_sock.sk_state_change = sock->sk->sk_state_change; |
1788 | |
1789 | listen_con.sock = sock; |
1790 | |
1791 | sock->sk->sk_allocation = GFP_NOFS; |
1792 | sock->sk->sk_use_task_frag = false; |
1793 | sock->sk->sk_data_ready = lowcomms_listen_data_ready; |
1794 | release_sock(sk: sock->sk); |
1795 | |
1796 | result = sock->ops->listen(sock, 128); |
1797 | if (result < 0) { |
1798 | dlm_close_sock(sock: &listen_con.sock); |
1799 | return result; |
1800 | } |
1801 | |
1802 | return 0; |
1803 | |
1804 | out: |
1805 | sock_release(sock); |
1806 | return result; |
1807 | } |
1808 | |
1809 | static int dlm_tcp_bind(struct socket *sock) |
1810 | { |
1811 | struct sockaddr_storage src_addr; |
1812 | int result, addr_len; |
1813 | |
1814 | /* Bind to our cluster-known address connecting to avoid |
1815 | * routing problems. |
1816 | */ |
1817 | memcpy(&src_addr, &dlm_local_addr[0], sizeof(src_addr)); |
1818 | make_sockaddr(saddr: &src_addr, port: 0, addr_len: &addr_len); |
1819 | |
1820 | result = sock->ops->bind(sock, (struct sockaddr *)&src_addr, |
1821 | addr_len); |
1822 | if (result < 0) { |
1823 | /* This *may* not indicate a critical error */ |
1824 | log_print("could not bind for connect: %d" , result); |
1825 | } |
1826 | |
1827 | return 0; |
1828 | } |
1829 | |
1830 | static int dlm_tcp_connect(struct connection *con, struct socket *sock, |
1831 | struct sockaddr *addr, int addr_len) |
1832 | { |
1833 | return sock->ops->connect(sock, addr, addr_len, O_NONBLOCK); |
1834 | } |
1835 | |
1836 | static int dlm_tcp_listen_validate(void) |
1837 | { |
1838 | /* We don't support multi-homed hosts */ |
1839 | if (dlm_local_count > 1) { |
1840 | log_print("TCP protocol can't handle multi-homed hosts, try SCTP" ); |
1841 | return -EINVAL; |
1842 | } |
1843 | |
1844 | return 0; |
1845 | } |
1846 | |
1847 | static void dlm_tcp_sockopts(struct socket *sock) |
1848 | { |
1849 | /* Turn off Nagle's algorithm */ |
1850 | tcp_sock_set_nodelay(sk: sock->sk); |
1851 | } |
1852 | |
1853 | static void dlm_tcp_listen_sockopts(struct socket *sock) |
1854 | { |
1855 | dlm_tcp_sockopts(sock); |
1856 | sock_set_reuseaddr(sk: sock->sk); |
1857 | } |
1858 | |
1859 | static int dlm_tcp_listen_bind(struct socket *sock) |
1860 | { |
1861 | int addr_len; |
1862 | |
1863 | /* Bind to our port */ |
1864 | make_sockaddr(saddr: &dlm_local_addr[0], port: dlm_config.ci_tcp_port, addr_len: &addr_len); |
1865 | return sock->ops->bind(sock, (struct sockaddr *)&dlm_local_addr[0], |
1866 | addr_len); |
1867 | } |
1868 | |
1869 | static const struct dlm_proto_ops dlm_tcp_ops = { |
1870 | .name = "TCP" , |
1871 | .proto = IPPROTO_TCP, |
1872 | .connect = dlm_tcp_connect, |
1873 | .sockopts = dlm_tcp_sockopts, |
1874 | .bind = dlm_tcp_bind, |
1875 | .listen_validate = dlm_tcp_listen_validate, |
1876 | .listen_sockopts = dlm_tcp_listen_sockopts, |
1877 | .listen_bind = dlm_tcp_listen_bind, |
1878 | }; |
1879 | |
1880 | static int dlm_sctp_bind(struct socket *sock) |
1881 | { |
1882 | return sctp_bind_addrs(sock, port: 0); |
1883 | } |
1884 | |
1885 | static int dlm_sctp_connect(struct connection *con, struct socket *sock, |
1886 | struct sockaddr *addr, int addr_len) |
1887 | { |
1888 | int ret; |
1889 | |
1890 | /* |
1891 | * Make sock->ops->connect() function return in specified time, |
1892 | * since O_NONBLOCK argument in connect() function does not work here, |
1893 | * then, we should restore the default value of this attribute. |
1894 | */ |
1895 | sock_set_sndtimeo(sk: sock->sk, secs: 5); |
1896 | ret = sock->ops->connect(sock, addr, addr_len, 0); |
1897 | sock_set_sndtimeo(sk: sock->sk, secs: 0); |
1898 | return ret; |
1899 | } |
1900 | |
1901 | static int dlm_sctp_listen_validate(void) |
1902 | { |
1903 | if (!IS_ENABLED(CONFIG_IP_SCTP)) { |
1904 | log_print("SCTP is not enabled by this kernel" ); |
1905 | return -EOPNOTSUPP; |
1906 | } |
1907 | |
1908 | request_module("sctp" ); |
1909 | return 0; |
1910 | } |
1911 | |
1912 | static int dlm_sctp_bind_listen(struct socket *sock) |
1913 | { |
1914 | return sctp_bind_addrs(sock, port: dlm_config.ci_tcp_port); |
1915 | } |
1916 | |
1917 | static void dlm_sctp_sockopts(struct socket *sock) |
1918 | { |
1919 | /* Turn off Nagle's algorithm */ |
1920 | sctp_sock_set_nodelay(sk: sock->sk); |
1921 | sock_set_rcvbuf(sk: sock->sk, NEEDED_RMEM); |
1922 | } |
1923 | |
1924 | static const struct dlm_proto_ops dlm_sctp_ops = { |
1925 | .name = "SCTP" , |
1926 | .proto = IPPROTO_SCTP, |
1927 | .try_new_addr = true, |
1928 | .connect = dlm_sctp_connect, |
1929 | .sockopts = dlm_sctp_sockopts, |
1930 | .bind = dlm_sctp_bind, |
1931 | .listen_validate = dlm_sctp_listen_validate, |
1932 | .listen_sockopts = dlm_sctp_sockopts, |
1933 | .listen_bind = dlm_sctp_bind_listen, |
1934 | }; |
1935 | |
1936 | int dlm_lowcomms_start(void) |
1937 | { |
1938 | int error; |
1939 | |
1940 | init_local(); |
1941 | if (!dlm_local_count) { |
1942 | error = -ENOTCONN; |
1943 | log_print("no local IP address has been set" ); |
1944 | goto fail; |
1945 | } |
1946 | |
1947 | error = work_start(); |
1948 | if (error) |
1949 | goto fail; |
1950 | |
1951 | /* Start listening */ |
1952 | switch (dlm_config.ci_protocol) { |
1953 | case DLM_PROTO_TCP: |
1954 | dlm_proto_ops = &dlm_tcp_ops; |
1955 | break; |
1956 | case DLM_PROTO_SCTP: |
1957 | dlm_proto_ops = &dlm_sctp_ops; |
1958 | break; |
1959 | default: |
1960 | log_print("Invalid protocol identifier %d set" , |
1961 | dlm_config.ci_protocol); |
1962 | error = -EINVAL; |
1963 | goto fail_proto_ops; |
1964 | } |
1965 | |
1966 | error = dlm_listen_for_all(); |
1967 | if (error) |
1968 | goto fail_listen; |
1969 | |
1970 | return 0; |
1971 | |
1972 | fail_listen: |
1973 | dlm_proto_ops = NULL; |
1974 | fail_proto_ops: |
1975 | work_stop(); |
1976 | fail: |
1977 | return error; |
1978 | } |
1979 | |
1980 | void dlm_lowcomms_init(void) |
1981 | { |
1982 | int i; |
1983 | |
1984 | for (i = 0; i < CONN_HASH_SIZE; i++) |
1985 | INIT_HLIST_HEAD(&connection_hash[i]); |
1986 | |
1987 | INIT_WORK(&listen_con.rwork, process_listen_recv_socket); |
1988 | } |
1989 | |
1990 | void dlm_lowcomms_exit(void) |
1991 | { |
1992 | struct connection *con; |
1993 | int i, idx; |
1994 | |
1995 | idx = srcu_read_lock(ssp: &connections_srcu); |
1996 | for (i = 0; i < CONN_HASH_SIZE; i++) { |
1997 | hlist_for_each_entry_rcu(con, &connection_hash[i], list) { |
1998 | spin_lock(lock: &connections_lock); |
1999 | hlist_del_rcu(n: &con->list); |
2000 | spin_unlock(lock: &connections_lock); |
2001 | |
2002 | if (con->othercon) |
2003 | call_srcu(ssp: &connections_srcu, head: &con->othercon->rcu, |
2004 | func: connection_release); |
2005 | call_srcu(ssp: &connections_srcu, head: &con->rcu, func: connection_release); |
2006 | } |
2007 | } |
2008 | srcu_read_unlock(ssp: &connections_srcu, idx); |
2009 | } |
2010 | |