corosync  3.0.1
totemsrp.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2003-2006 MontaVista Software, Inc.
3  * Copyright (c) 2006-2018 Red Hat, Inc.
4  *
5  * All rights reserved.
6  *
7  * Author: Steven Dake (sdake@redhat.com)
8  *
9  * This software licensed under BSD license, the text of which follows:
10  *
11  * Redistribution and use in source and binary forms, with or without
12  * modification, are permitted provided that the following conditions are met:
13  *
14  * - Redistributions of source code must retain the above copyright notice,
15  * this list of conditions and the following disclaimer.
16  * - Redistributions in binary form must reproduce the above copyright notice,
17  * this list of conditions and the following disclaimer in the documentation
18  * and/or other materials provided with the distribution.
19  * - Neither the name of the MontaVista Software, Inc. nor the names of its
20  * contributors may be used to endorse or promote products derived from this
21  * software without specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
33  * THE POSSIBILITY OF SUCH DAMAGE.
34  */
35 
36 /*
37  * The first version of this code was based upon Yair Amir's PhD thesis:
38  * http://www.cs.jhu.edu/~yairamir/phd.ps) (ch4,5).
39  *
40  * The current version of totemsrp implements the Totem protocol specified in:
41  * http://citeseer.ist.psu.edu/amir95totem.html
42  *
43  * The deviations from the above published protocols are:
44  * - token hold mode where token doesn't rotate on unused ring - reduces cpu
45  * usage on 1.6ghz xeon from 35% to less then .1 % as measured by top
46  */
47 
48 #include <config.h>
49 
50 #include <assert.h>
51 #ifdef HAVE_ALLOCA_H
52 #include <alloca.h>
53 #endif
54 #include <sys/mman.h>
55 #include <sys/types.h>
56 #include <sys/stat.h>
57 #include <sys/socket.h>
58 #include <netdb.h>
59 #include <sys/un.h>
60 #include <sys/ioctl.h>
61 #include <sys/param.h>
62 #include <netinet/in.h>
63 #include <arpa/inet.h>
64 #include <unistd.h>
65 #include <fcntl.h>
66 #include <stdlib.h>
67 #include <stdio.h>
68 #include <errno.h>
69 #include <sched.h>
70 #include <time.h>
71 #include <sys/time.h>
72 #include <sys/poll.h>
73 #include <sys/uio.h>
74 #include <limits.h>
75 
76 #include <qb/qblist.h>
77 #include <qb/qbdefs.h>
78 #include <qb/qbutil.h>
79 #include <qb/qbloop.h>
80 
81 #include <corosync/swab.h>
82 #include <corosync/sq.h>
83 
84 #define LOGSYS_UTILS_ONLY 1
85 #include <corosync/logsys.h>
86 
87 #include "totemsrp.h"
88 #include "totemnet.h"
89 
90 #include "cs_queue.h"
91 
92 #define LOCALHOST_IP inet_addr("127.0.0.1")
93 #define QUEUE_RTR_ITEMS_SIZE_MAX 16384 /* allow 16384 retransmit items */
94 #define RETRANS_MESSAGE_QUEUE_SIZE_MAX 16384 /* allow 500 messages to be queued */
95 #define RECEIVED_MESSAGE_QUEUE_SIZE_MAX 500 /* allow 500 messages to be queued */
96 #define MAXIOVS 5
97 #define RETRANSMIT_ENTRIES_MAX 30
98 #define TOKEN_SIZE_MAX 64000 /* bytes */
99 #define LEAVE_DUMMY_NODEID 0
100 
101 /*
102  * SRP address.
103  */
104 struct srp_addr {
105  unsigned int nodeid;
106 };
107 
108 /*
109  * Rollover handling:
110  * SEQNO_START_MSG is the starting sequence number after a new configuration
111  * This should remain zero, unless testing overflow in which case
112  * 0x7ffff000 and 0xfffff000 are good starting values.
113  *
114  * SEQNO_START_TOKEN is the starting sequence number after a new configuration
115  * for a token. This should remain zero, unless testing overflow in which
116  * case 07fffff00 or 0xffffff00 are good starting values.
117  */
118 #define SEQNO_START_MSG 0x0
119 #define SEQNO_START_TOKEN 0x0
120 
121 /*
122  * These can be used ot test different rollover points
123  * #define SEQNO_START_MSG 0xfffffe00
124  * #define SEQNO_START_TOKEN 0xfffffe00
125  */
126 
127 /*
128  * These can be used to test the error recovery algorithms
129  * #define TEST_DROP_ORF_TOKEN_PERCENTAGE 30
130  * #define TEST_DROP_COMMIT_TOKEN_PERCENTAGE 30
131  * #define TEST_DROP_MCAST_PERCENTAGE 50
132  * #define TEST_RECOVERY_MSG_COUNT 300
133  */
134 
135 /*
136  * we compare incoming messages to determine if their endian is
137  * different - if so convert them
138  *
139  * do not change
140  */
141 #define ENDIAN_LOCAL 0xff22
142 
144  MESSAGE_TYPE_ORF_TOKEN = 0, /* Ordering, Reliability, Flow (ORF) control Token */
145  MESSAGE_TYPE_MCAST = 1, /* ring ordered multicast message */
146  MESSAGE_TYPE_MEMB_MERGE_DETECT = 2, /* merge rings if there are available rings */
147  MESSAGE_TYPE_MEMB_JOIN = 3, /* membership join message */
148  MESSAGE_TYPE_MEMB_COMMIT_TOKEN = 4, /* membership commit token */
149  MESSAGE_TYPE_TOKEN_HOLD_CANCEL = 5, /* cancel the holding of the token */
150 };
151 
155 };
156 
157 /*
158  * New membership algorithm local variables
159  */
161  struct srp_addr addr;
162  int set;
163 };
164 
165 
167  struct qb_list_head list;
168  int (*callback_fn) (enum totem_callback_token_type type, const void *);
169  enum totem_callback_token_type callback_type;
170  int delete;
171  void *data;
172 };
173 
174 
176  int mcast;
177  int token;
178 };
179 
180 struct mcast {
183  unsigned int seq;
186  unsigned int node_id;
188 } __attribute__((packed));
189 
190 
191 struct rtr_item {
193  unsigned int seq;
194 }__attribute__((packed));
195 
196 
197 struct orf_token {
199  unsigned int seq;
200  unsigned int token_seq;
201  unsigned int aru;
202  unsigned int aru_addr;
204  unsigned int backlog;
205  unsigned int fcc;
208  struct rtr_item rtr_list[0];
209 }__attribute__((packed));
210 
211 
212 struct memb_join {
215  unsigned int proc_list_entries;
216  unsigned int failed_list_entries;
217  unsigned long long ring_seq;
218  unsigned char end_of_memb_join[0];
219 /*
220  * These parts of the data structure are dynamic:
221  * struct srp_addr proc_list[];
222  * struct srp_addr failed_list[];
223  */
224 } __attribute__((packed));
225 
226 
231 } __attribute__((packed));
232 
233 
237 } __attribute__((packed));
238 
239 
242  unsigned int aru;
243  unsigned int high_delivered;
244  unsigned int received_flg;
245 }__attribute__((packed));
246 
247 
250  unsigned int token_seq;
252  unsigned int retrans_flg;
255  unsigned char end_of_commit_token[0];
256 /*
257  * These parts of the data structure are dynamic:
258  *
259  * struct srp_addr addr[PROCESSOR_COUNT_MAX];
260  * struct memb_commit_token_memb_entry memb_list[PROCESSOR_COUNT_MAX];
261  */
262 }__attribute__((packed));
264 struct message_item {
265  struct mcast *mcast;
266  unsigned int msg_len;
267 };
270  struct mcast *mcast;
271  unsigned int msg_len;
272 };
273 
279 };
280 
283 
285 
286  /*
287  * Flow control mcasts and remcasts on last and current orf_token
288  */
290 
292 
294 
295  struct consensus_list_item consensus_list[PROCESSOR_COUNT_MAX];
296 
298 
300 
301  struct srp_addr my_id;
302 
303  struct totem_ip_address my_addrs[INTERFACE_MAX];
304 
305  struct srp_addr my_proc_list[PROCESSOR_COUNT_MAX];
306 
307  struct srp_addr my_failed_list[PROCESSOR_COUNT_MAX];
308 
309  struct srp_addr my_new_memb_list[PROCESSOR_COUNT_MAX];
310 
311  struct srp_addr my_trans_memb_list[PROCESSOR_COUNT_MAX];
312 
313  struct srp_addr my_memb_list[PROCESSOR_COUNT_MAX];
314 
315  struct srp_addr my_deliver_memb_list[PROCESSOR_COUNT_MAX];
316 
317  struct srp_addr my_left_memb_list[PROCESSOR_COUNT_MAX];
318 
319  unsigned int my_leave_memb_list[PROCESSOR_COUNT_MAX];
320 
322 
324 
326 
328 
330 
332 
334 
336 
337  struct memb_ring_id my_ring_id;
338 
339  struct memb_ring_id my_old_ring_id;
340 
342 
344 
345  unsigned int my_last_aru;
346 
348 
350 
351  unsigned int my_high_seq_received;
352 
353  unsigned int my_install_seq;
354 
356 
358 
360 
362 
364 
365  /*
366  * Queues used to order, deliver, and recover messages
367  */
368  struct cs_queue new_message_queue;
369 
370  struct cs_queue new_message_queue_trans;
371 
372  struct cs_queue retrans_message_queue;
373 
374  struct sq regular_sort_queue;
375 
376  struct sq recovery_sort_queue;
377 
378  /*
379  * Received up to and including
380  */
381  unsigned int my_aru;
382 
383  unsigned int my_high_delivered;
384 
385  struct qb_list_head token_callback_received_listhead;
386 
387  struct qb_list_head token_callback_sent_listhead;
388 
389  char orf_token_retransmit[TOKEN_SIZE_MAX];
390 
392 
393  unsigned int my_token_seq;
394 
395  /*
396  * Timers
397  */
398  qb_loop_timer_handle timer_pause_timeout;
399 
400  qb_loop_timer_handle timer_orf_token_timeout;
401 
402  qb_loop_timer_handle timer_orf_token_warning;
403 
405 
407 
408  qb_loop_timer_handle timer_merge_detect_timeout;
409 
411 
413 
414  qb_loop_timer_handle memb_timer_state_commit_timeout;
415 
416  qb_loop_timer_handle timer_heartbeat_timeout;
417 
418  /*
419  * Function and data used to log messages
420  */
422 
424 
426 
428 
430 
432 
434 
435  void (*totemsrp_log_printf) (
436  int level,
437  int sybsys,
438  const char *function,
439  const char *file,
440  int line,
441  const char *format, ...)__attribute__((format(printf, 6, 7)));;
442 
444 
445 //TODO struct srp_addr next_memb;
446 
448 
449  struct totem_ip_address mcast_address;
450 
451  void (*totemsrp_deliver_fn) (
452  unsigned int nodeid,
453  const void *msg,
454  unsigned int msg_len,
455  int endian_conversion_required);
456 
457  void (*totemsrp_confchg_fn) (
458  enum totem_configuration_type configuration_type,
459  const unsigned int *member_list, size_t member_list_entries,
460  const unsigned int *left_list, size_t left_list_entries,
461  const unsigned int *joined_list, size_t joined_list_entries,
462  const struct memb_ring_id *ring_id);
463 
464  void (*totemsrp_service_ready_fn) (void);
465 
466  void (*totemsrp_waiting_trans_ack_cb_fn) (
467  int waiting_trans_ack);
468 
469  void (*memb_ring_id_create_or_load) (
470  struct memb_ring_id *memb_ring_id,
471  unsigned int nodeid);
472 
473  void (*memb_ring_id_store) (
474  const struct memb_ring_id *memb_ring_id,
475  unsigned int nodeid);
476 
478 
480 
481  unsigned long long token_ring_id_seq;
482 
483  unsigned int last_released;
484 
485  unsigned int set_aru;
486 
488 
490 
492 
493  unsigned int my_last_seq;
494 
495  struct timeval tv_old;
496 
498 
500 
501  unsigned int use_heartbeat;
502 
503  unsigned int my_trc;
504 
505  unsigned int my_pbl;
506 
507  unsigned int my_cbl;
508 
509  uint64_t pause_timestamp;
510 
512 
514 
516 
518 
520 
522 
523  int flushing;
524 
527  char commit_token_storage[40000];
528 };
529 
531  int count;
532  int (*handler_functions[6]) (
533  struct totemsrp_instance *instance,
534  const void *msg,
535  size_t msg_len,
536  int endian_conversion_needed);
537 };
538 
557 };
558 
559 const char* gather_state_from_desc [] = {
560  [TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT] = "consensus timeout",
561  [TOTEMSRP_GSFROM_GATHER_MISSING1] = "MISSING",
562  [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE] = "The token was lost in the OPERATIONAL state.",
563  [TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED] = "The consensus timeout expired.",
564  [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE] = "The token was lost in the COMMIT state.",
565  [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE] = "The token was lost in the RECOVERY state.",
566  [TOTEMSRP_GSFROM_FAILED_TO_RECEIVE] = "failed to receive",
567  [TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE] = "foreign message in operational state",
568  [TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE] = "foreign message in gather state",
569  [TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE] = "merge during operational state",
570  [TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE] = "merge during gather state",
571  [TOTEMSRP_GSFROM_MERGE_DURING_JOIN] = "merge during join",
572  [TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE] = "join during operational state",
573  [TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE] = "join during commit state",
574  [TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY] = "join during recovery",
575  [TOTEMSRP_GSFROM_INTERFACE_CHANGE] = "interface change",
576 };
577 
578 /*
579  * forward decls
580  */
581 static int message_handler_orf_token (
582  struct totemsrp_instance *instance,
583  const void *msg,
584  size_t msg_len,
585  int endian_conversion_needed);
586 
587 static int message_handler_mcast (
588  struct totemsrp_instance *instance,
589  const void *msg,
590  size_t msg_len,
591  int endian_conversion_needed);
592 
593 static int message_handler_memb_merge_detect (
594  struct totemsrp_instance *instance,
595  const void *msg,
596  size_t msg_len,
597  int endian_conversion_needed);
598 
599 static int message_handler_memb_join (
600  struct totemsrp_instance *instance,
601  const void *msg,
602  size_t msg_len,
603  int endian_conversion_needed);
604 
605 static int message_handler_memb_commit_token (
606  struct totemsrp_instance *instance,
607  const void *msg,
608  size_t msg_len,
609  int endian_conversion_needed);
610 
611 static int message_handler_token_hold_cancel (
612  struct totemsrp_instance *instance,
613  const void *msg,
614  size_t msg_len,
615  int endian_conversion_needed);
616 
617 static void totemsrp_instance_initialize (struct totemsrp_instance *instance);
618 
619 static void srp_addr_copy (struct srp_addr *dest, const struct srp_addr *src);
620 
621 static void srp_addr_to_nodeid (
622  struct totemsrp_instance *instance,
623  unsigned int *nodeid_out,
624  struct srp_addr *srp_addr_in,
625  unsigned int entries);
626 
627 static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b);
628 
629 static void memb_leave_message_send (struct totemsrp_instance *instance);
630 
631 static void token_callbacks_execute (struct totemsrp_instance *instance, enum totem_callback_token_type type);
632 static void memb_state_gather_enter (struct totemsrp_instance *instance, enum gather_state_from gather_from);
633 static void messages_deliver_to_app (struct totemsrp_instance *instance, int skip, unsigned int end_point);
634 static int orf_token_mcast (struct totemsrp_instance *instance, struct orf_token *oken,
635  int fcc_mcasts_allowed);
636 static void messages_free (struct totemsrp_instance *instance, unsigned int token_aru);
637 
638 static void memb_ring_id_set (struct totemsrp_instance *instance,
639  const struct memb_ring_id *ring_id);
640 static void target_set_completed (void *context);
641 static void memb_state_commit_token_update (struct totemsrp_instance *instance);
642 static void memb_state_commit_token_target_set (struct totemsrp_instance *instance);
643 static int memb_state_commit_token_send (struct totemsrp_instance *instance);
644 static int memb_state_commit_token_send_recovery (struct totemsrp_instance *instance, struct memb_commit_token *memb_commit_token);
645 static void memb_state_commit_token_create (struct totemsrp_instance *instance);
646 static int token_hold_cancel_send (struct totemsrp_instance *instance);
647 static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out);
648 static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out);
649 static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out);
650 static void mcast_endian_convert (const struct mcast *in, struct mcast *out);
651 static void memb_merge_detect_endian_convert (
652  const struct memb_merge_detect *in,
653  struct memb_merge_detect *out);
654 static void srp_addr_copy_endian_convert (struct srp_addr *out, const struct srp_addr *in);
655 static void timer_function_orf_token_timeout (void *data);
656 static void timer_function_orf_token_warning (void *data);
657 static void timer_function_pause_timeout (void *data);
658 static void timer_function_heartbeat_timeout (void *data);
659 static void timer_function_token_retransmit_timeout (void *data);
660 static void timer_function_token_hold_retransmit_timeout (void *data);
661 static void timer_function_merge_detect_timeout (void *data);
662 static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance);
663 static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr);
664 static const char* gsfrom_to_msg(enum gather_state_from gsfrom);
665 
666 void main_deliver_fn (
667  void *context,
668  const void *msg,
669  unsigned int msg_len,
670  const struct sockaddr_storage *system_from);
671 
673  void *context,
674  const struct totem_ip_address *iface_address,
675  unsigned int iface_no);
676 
678  6,
679  {
680  message_handler_orf_token, /* MESSAGE_TYPE_ORF_TOKEN */
681  message_handler_mcast, /* MESSAGE_TYPE_MCAST */
682  message_handler_memb_merge_detect, /* MESSAGE_TYPE_MEMB_MERGE_DETECT */
683  message_handler_memb_join, /* MESSAGE_TYPE_MEMB_JOIN */
684  message_handler_memb_commit_token, /* MESSAGE_TYPE_MEMB_COMMIT_TOKEN */
685  message_handler_token_hold_cancel /* MESSAGE_TYPE_TOKEN_HOLD_CANCEL */
686  }
687 };
688 
689 #define log_printf(level, format, args...) \
690 do { \
691  instance->totemsrp_log_printf ( \
692  level, instance->totemsrp_subsys_id, \
693  __FUNCTION__, __FILE__, __LINE__, \
694  format, ##args); \
695 } while (0);
696 #define LOGSYS_PERROR(err_num, level, fmt, args...) \
697 do { \
698  char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \
699  const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \
700  instance->totemsrp_log_printf ( \
701  level, instance->totemsrp_subsys_id, \
702  __FUNCTION__, __FILE__, __LINE__, \
703  fmt ": %s (%d)\n", ##args, _error_ptr, err_num); \
704  } while(0)
705 
706 static const char* gsfrom_to_msg(enum gather_state_from gsfrom)
707 {
708  if (gsfrom <= TOTEMSRP_GSFROM_MAX) {
709  return gather_state_from_desc[gsfrom];
710  }
711  else {
712  return "UNKNOWN";
713  }
714 }
715 
716 static void totemsrp_instance_initialize (struct totemsrp_instance *instance)
717 {
718  memset (instance, 0, sizeof (struct totemsrp_instance));
719 
720  qb_list_init (&instance->token_callback_received_listhead);
721 
722  qb_list_init (&instance->token_callback_sent_listhead);
723 
724  instance->my_received_flg = 1;
725 
726  instance->my_token_seq = SEQNO_START_TOKEN - 1;
727 
729 
730  instance->set_aru = -1;
731 
732  instance->my_aru = SEQNO_START_MSG;
733 
735 
737 
738  instance->orf_token_discard = 0;
739 
740  instance->originated_orf_token = 0;
741 
742  instance->commit_token = (struct memb_commit_token *)instance->commit_token_storage;
743 
744  instance->waiting_trans_ack = 1;
745 }
746 
747 static int pause_flush (struct totemsrp_instance *instance)
748 {
749  uint64_t now_msec;
750  uint64_t timestamp_msec;
751  int res = 0;
752 
753  now_msec = (qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC);
754  timestamp_msec = instance->pause_timestamp / QB_TIME_NS_IN_MSEC;
755 
756  if ((now_msec - timestamp_msec) > (instance->totem_config->token_timeout / 2)) {
758  "Process pause detected for %d ms, flushing membership messages.", (unsigned int)(now_msec - timestamp_msec));
759  /*
760  * -1 indicates an error from recvmsg
761  */
762  do {
764  } while (res == -1);
765  }
766  return (res);
767 }
768 
769 static int token_event_stats_collector (enum totem_callback_token_type type, const void *void_instance)
770 {
771  struct totemsrp_instance *instance = (struct totemsrp_instance *)void_instance;
772  uint32_t time_now;
773  unsigned long long nano_secs = qb_util_nano_current_get ();
774 
775  time_now = (nano_secs / QB_TIME_NS_IN_MSEC);
776 
777  if (type == TOTEM_CALLBACK_TOKEN_RECEIVED) {
778  /* incr latest token the index */
779  if (instance->stats.latest_token == (TOTEM_TOKEN_STATS_MAX - 1))
780  instance->stats.latest_token = 0;
781  else
782  instance->stats.latest_token++;
783 
784  if (instance->stats.earliest_token == instance->stats.latest_token) {
785  /* we have filled up the array, start overwriting */
786  if (instance->stats.earliest_token == (TOTEM_TOKEN_STATS_MAX - 1))
787  instance->stats.earliest_token = 0;
788  else
789  instance->stats.earliest_token++;
790 
791  instance->stats.token[instance->stats.earliest_token].rx = 0;
792  instance->stats.token[instance->stats.earliest_token].tx = 0;
793  instance->stats.token[instance->stats.earliest_token].backlog_calc = 0;
794  }
795 
796  instance->stats.token[instance->stats.latest_token].rx = time_now;
797  instance->stats.token[instance->stats.latest_token].tx = 0; /* in case we drop the token */
798  } else {
799  instance->stats.token[instance->stats.latest_token].tx = time_now;
800  }
801  return 0;
802 }
803 
804 static void totempg_mtu_changed(void *context, int net_mtu)
805 {
806  struct totemsrp_instance *instance = context;
807 
808  instance->totem_config->net_mtu = net_mtu - 2 * sizeof (struct mcast);
809 
811  "Net MTU changed to %d, new value is %d",
812  net_mtu, instance->totem_config->net_mtu);
813 }
814 
815 /*
816  * Exported interfaces
817  */
819  qb_loop_t *poll_handle,
820  void **srp_context,
821  struct totem_config *totem_config,
822  totempg_stats_t *stats,
823 
824  void (*deliver_fn) (
825  unsigned int nodeid,
826  const void *msg,
827  unsigned int msg_len,
828  int endian_conversion_required),
829 
830  void (*confchg_fn) (
831  enum totem_configuration_type configuration_type,
832  const unsigned int *member_list, size_t member_list_entries,
833  const unsigned int *left_list, size_t left_list_entries,
834  const unsigned int *joined_list, size_t joined_list_entries,
835  const struct memb_ring_id *ring_id),
836  void (*waiting_trans_ack_cb_fn) (
837  int waiting_trans_ack))
838 {
839  struct totemsrp_instance *instance;
840  int res;
841 
842  instance = malloc (sizeof (struct totemsrp_instance));
843  if (instance == NULL) {
844  goto error_exit;
845  }
846 
847  totemsrp_instance_initialize (instance);
848 
849  instance->totemsrp_waiting_trans_ack_cb_fn = waiting_trans_ack_cb_fn;
850  instance->totemsrp_waiting_trans_ack_cb_fn (1);
851 
852  stats->srp = &instance->stats;
853  instance->stats.latest_token = 0;
854  instance->stats.earliest_token = 0;
855 
856  instance->totem_config = totem_config;
857 
858  /*
859  * Configure logging
860  */
869 
870  /*
871  * Configure totem store and load functions
872  */
874  instance->memb_ring_id_store = totem_config->totem_memb_ring_id_store;
875 
876  /*
877  * Initialize local variables for totemsrp
878  */
879  totemip_copy (&instance->mcast_address, &totem_config->interfaces[instance->lowest_active_if].mcast_addr);
880 
881  /*
882  * Display totem configuration
883  */
885  "Token Timeout (%d ms) retransmit timeout (%d ms)",
886  totem_config->token_timeout, totem_config->token_retransmit_timeout);
887  if (totem_config->token_warning) {
888  uint32_t token_warning_ms = totem_config->token_warning * totem_config->token_timeout / 100;
890  "Token warning every %d ms (%d%% of Token Timeout)",
891  token_warning_ms, totem_config->token_warning);
892  if (token_warning_ms < totem_config->token_retransmit_timeout)
894  "The token warning interval (%d ms) is less than the token retransmit timeout (%d ms) "
895  "which can lead to spurious token warnings. Consider increasing the token_warning parameter.",
896  token_warning_ms, totem_config->token_retransmit_timeout);
897  } else {
899  "Token warnings disabled");
900  }
902  "token hold (%d ms) retransmits before loss (%d retrans)",
903  totem_config->token_hold_timeout, totem_config->token_retransmits_before_loss_const);
905  "join (%d ms) send_join (%d ms) consensus (%d ms) merge (%d ms)",
906  totem_config->join_timeout,
907  totem_config->send_join_timeout,
908  totem_config->consensus_timeout,
909 
910  totem_config->merge_timeout);
912  "downcheck (%d ms) fail to recv const (%d msgs)",
913  totem_config->downcheck_timeout, totem_config->fail_to_recv_const);
915  "seqno unchanged const (%d rotations) Maximum network MTU %d", totem_config->seqno_unchanged_const, totem_config->net_mtu);
916 
918  "window size per rotation (%d messages) maximum messages per rotation (%d messages)",
919  totem_config->window_size, totem_config->max_messages);
920 
922  "missed count const (%d messages)",
923  totem_config->miss_count_const);
924 
926  "send threads (%d threads)", totem_config->threads);
927 
929  "heartbeat_failures_allowed (%d)", totem_config->heartbeat_failures_allowed);
931  "max_network_delay (%d ms)", totem_config->max_network_delay);
932 
933 
934  cs_queue_init (&instance->retrans_message_queue, RETRANS_MESSAGE_QUEUE_SIZE_MAX,
935  sizeof (struct message_item), instance->threaded_mode_enabled);
936 
937  sq_init (&instance->regular_sort_queue,
938  QUEUE_RTR_ITEMS_SIZE_MAX, sizeof (struct sort_queue_item), 0);
939 
940  sq_init (&instance->recovery_sort_queue,
941  QUEUE_RTR_ITEMS_SIZE_MAX, sizeof (struct sort_queue_item), 0);
942 
943  instance->totemsrp_poll_handle = poll_handle;
944 
945  instance->totemsrp_deliver_fn = deliver_fn;
946 
947  instance->totemsrp_confchg_fn = confchg_fn;
948  instance->use_heartbeat = 1;
949 
950  timer_function_pause_timeout (instance);
951 
952  if ( totem_config->heartbeat_failures_allowed == 0 ) {
954  "HeartBeat is Disabled. To enable set heartbeat_failures_allowed > 0");
955  instance->use_heartbeat = 0;
956  }
957 
958  if (instance->use_heartbeat) {
959  instance->heartbeat_timeout
960  = (totem_config->heartbeat_failures_allowed) * totem_config->token_retransmit_timeout
961  + totem_config->max_network_delay;
962 
963  if (instance->heartbeat_timeout >= totem_config->token_timeout) {
965  "total heartbeat_timeout (%d ms) is not less than token timeout (%d ms)",
966  instance->heartbeat_timeout,
967  totem_config->token_timeout);
969  "heartbeat_timeout = heartbeat_failures_allowed * token_retransmit_timeout + max_network_delay");
971  "heartbeat timeout should be less than the token timeout. Heartbeat is disabled!!");
972  instance->use_heartbeat = 0;
973  }
974  else {
976  "total heartbeat_timeout (%d ms)", instance->heartbeat_timeout);
977  }
978  }
979 
980  res = totemnet_initialize (
981  poll_handle,
982  &instance->totemnet_context,
983  totem_config,
984  stats->srp,
985  instance,
986  main_deliver_fn,
987  main_iface_change_fn,
988  totempg_mtu_changed,
989  target_set_completed);
990  if (res == -1) {
991  goto error_exit;
992  }
993 
994  instance->my_id.nodeid = instance->totem_config->interfaces[instance->lowest_active_if].boundto.nodeid;
995 
996  /*
997  * Must have net_mtu adjusted by totemnet_initialize first
998  */
999  cs_queue_init (&instance->new_message_queue,
1001  sizeof (struct message_item), instance->threaded_mode_enabled);
1002 
1003  cs_queue_init (&instance->new_message_queue_trans,
1005  sizeof (struct message_item), instance->threaded_mode_enabled);
1006 
1008  &instance->token_recv_event_handle,
1010  0,
1011  token_event_stats_collector,
1012  instance);
1014  &instance->token_sent_event_handle,
1016  0,
1017  token_event_stats_collector,
1018  instance);
1019  *srp_context = instance;
1020  return (0);
1021 
1022 error_exit:
1023  return (-1);
1024 }
1025 
1027  void *srp_context)
1028 {
1029  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1030 
1031  memb_leave_message_send (instance);
1032  totemnet_finalize (instance->totemnet_context);
1033  cs_queue_free (&instance->new_message_queue);
1034  cs_queue_free (&instance->new_message_queue_trans);
1035  cs_queue_free (&instance->retrans_message_queue);
1036  sq_free (&instance->regular_sort_queue);
1037  sq_free (&instance->recovery_sort_queue);
1038  free (instance);
1039 }
1040 
1041 /*
1042  * Return configured interfaces. interfaces is array of totem_ip addresses allocated by caller,
1043  * with interaces_size number of items. iface_count is final number of interfaces filled by this
1044  * function.
1045  *
1046  * Function returns 0 on success, otherwise if interfaces array is not big enough, -2 is returned,
1047  * and if interface was not found, -1 is returned.
1048  */
1050  void *srp_context,
1051  unsigned int nodeid,
1052  unsigned int *interface_id,
1053  struct totem_ip_address *interfaces,
1054  unsigned int interfaces_size,
1055  char ***status,
1056  unsigned int *iface_count)
1057 {
1058  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1059  struct totem_ip_address *iface_ptr = interfaces;
1060  int res = 0;
1061  int i,n;
1062  int num_ifs = 0;
1063 
1064  memset(interfaces, 0, sizeof(struct totem_ip_address) * interfaces_size);
1065  *iface_count = INTERFACE_MAX;
1066 
1067  for (i=0; i<INTERFACE_MAX; i++) {
1068  for (n=0; n < instance->totem_config->interfaces[i].member_count; n++) {
1069  if (instance->totem_config->interfaces[i].configured &&
1070  instance->totem_config->interfaces[i].member_list[n].nodeid == nodeid) {
1071  memcpy(iface_ptr, &instance->totem_config->interfaces[i].member_list[n], sizeof(struct totem_ip_address));
1072  interface_id[num_ifs] = i;
1073  iface_ptr++;
1074  if (++num_ifs > interfaces_size) {
1075  res = -2;
1076  break;
1077  }
1078  }
1079  }
1080  }
1081 
1082  totemnet_ifaces_get(instance->totemnet_context, status, iface_count);
1083  *iface_count = num_ifs;
1084  return (res);
1085 }
1086 
1088  void *srp_context,
1089  const char *cipher_type,
1090  const char *hash_type)
1091 {
1092  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1093  int res;
1094 
1095  res = totemnet_crypto_set(instance->totemnet_context, cipher_type, hash_type);
1096 
1097  return (res);
1098 }
1099 
1100 
1102  void *srp_context)
1103 {
1104  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1105  unsigned int res;
1106 
1107  res = instance->my_id.nodeid;
1108 
1109  return (res);
1110 }
1111 
1113  void *srp_context)
1114 {
1115  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1116  int res;
1117 
1118  res = instance->totem_config->interfaces[instance->lowest_active_if].boundto.family;
1119 
1120  return (res);
1121 }
1122 
1123 
1124 /*
1125  * Set operations for use by the membership algorithm
1126  */
1127 static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b)
1128 {
1129  if (a->nodeid == b->nodeid) {
1130  return 1;
1131  }
1132  return 0;
1133 }
1134 
1135 static void srp_addr_copy (struct srp_addr *dest, const struct srp_addr *src)
1136 {
1137  dest->nodeid = src->nodeid;
1138 }
1139 
1140 static void srp_addr_to_nodeid (
1141  struct totemsrp_instance *instance,
1142  unsigned int *nodeid_out,
1143  struct srp_addr *srp_addr_in,
1144  unsigned int entries)
1145 {
1146  unsigned int i;
1147 
1148  for (i = 0; i < entries; i++) {
1149  nodeid_out[i] = srp_addr_in[i].nodeid;
1150  }
1151 }
1152 
1153 static void srp_addr_copy_endian_convert (struct srp_addr *out, const struct srp_addr *in)
1154 {
1155  out->nodeid = swab32 (in->nodeid);
1156 }
1157 
1158 static void memb_consensus_reset (struct totemsrp_instance *instance)
1159 {
1160  instance->consensus_list_entries = 0;
1161 }
1162 
1163 static void memb_set_subtract (
1164  struct srp_addr *out_list, int *out_list_entries,
1165  struct srp_addr *one_list, int one_list_entries,
1166  struct srp_addr *two_list, int two_list_entries)
1167 {
1168  int found = 0;
1169  int i;
1170  int j;
1171 
1172  *out_list_entries = 0;
1173 
1174  for (i = 0; i < one_list_entries; i++) {
1175  for (j = 0; j < two_list_entries; j++) {
1176  if (srp_addr_equal (&one_list[i], &two_list[j])) {
1177  found = 1;
1178  break;
1179  }
1180  }
1181  if (found == 0) {
1182  srp_addr_copy (&out_list[*out_list_entries], &one_list[i]);
1183  *out_list_entries = *out_list_entries + 1;
1184  }
1185  found = 0;
1186  }
1187 }
1188 
1189 /*
1190  * Set consensus for a specific processor
1191  */
1192 static void memb_consensus_set (
1193  struct totemsrp_instance *instance,
1194  const struct srp_addr *addr)
1195 {
1196  int found = 0;
1197  int i;
1198 
1199  for (i = 0; i < instance->consensus_list_entries; i++) {
1200  if (srp_addr_equal(addr, &instance->consensus_list[i].addr)) {
1201  found = 1;
1202  break; /* found entry */
1203  }
1204  }
1205  srp_addr_copy (&instance->consensus_list[i].addr, addr);
1206  instance->consensus_list[i].set = 1;
1207  if (found == 0) {
1208  instance->consensus_list_entries++;
1209  }
1210  return;
1211 }
1212 
1213 /*
1214  * Is consensus set for a specific processor
1215  */
1216 static int memb_consensus_isset (
1217  struct totemsrp_instance *instance,
1218  const struct srp_addr *addr)
1219 {
1220  int i;
1221 
1222  for (i = 0; i < instance->consensus_list_entries; i++) {
1223  if (srp_addr_equal (addr, &instance->consensus_list[i].addr)) {
1224  return (instance->consensus_list[i].set);
1225  }
1226  }
1227  return (0);
1228 }
1229 
1230 /*
1231  * Is consensus agreed upon based upon consensus database
1232  */
1233 static int memb_consensus_agreed (
1234  struct totemsrp_instance *instance)
1235 {
1236  struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
1237  int token_memb_entries = 0;
1238  int agreed = 1;
1239  int i;
1240 
1241  memb_set_subtract (token_memb, &token_memb_entries,
1242  instance->my_proc_list, instance->my_proc_list_entries,
1243  instance->my_failed_list, instance->my_failed_list_entries);
1244 
1245  for (i = 0; i < token_memb_entries; i++) {
1246  if (memb_consensus_isset (instance, &token_memb[i]) == 0) {
1247  agreed = 0;
1248  break;
1249  }
1250  }
1251 
1252  if (agreed && instance->failed_to_recv == 1) {
1253  /*
1254  * Both nodes agreed on our failure. We don't care how many proc list items left because we
1255  * will create single ring anyway.
1256  */
1257 
1258  return (agreed);
1259  }
1260 
1261  assert (token_memb_entries >= 1);
1262 
1263  return (agreed);
1264 }
1265 
1266 static void memb_consensus_notset (
1267  struct totemsrp_instance *instance,
1268  struct srp_addr *no_consensus_list,
1269  int *no_consensus_list_entries,
1270  struct srp_addr *comparison_list,
1271  int comparison_list_entries)
1272 {
1273  int i;
1274 
1275  *no_consensus_list_entries = 0;
1276 
1277  for (i = 0; i < instance->my_proc_list_entries; i++) {
1278  if (memb_consensus_isset (instance, &instance->my_proc_list[i]) == 0) {
1279  srp_addr_copy (&no_consensus_list[*no_consensus_list_entries], &instance->my_proc_list[i]);
1280  *no_consensus_list_entries = *no_consensus_list_entries + 1;
1281  }
1282  }
1283 }
1284 
1285 /*
1286  * Is set1 equal to set2 Entries can be in different orders
1287  */
1288 static int memb_set_equal (
1289  struct srp_addr *set1, int set1_entries,
1290  struct srp_addr *set2, int set2_entries)
1291 {
1292  int i;
1293  int j;
1294 
1295  int found = 0;
1296 
1297  if (set1_entries != set2_entries) {
1298  return (0);
1299  }
1300  for (i = 0; i < set2_entries; i++) {
1301  for (j = 0; j < set1_entries; j++) {
1302  if (srp_addr_equal (&set1[j], &set2[i])) {
1303  found = 1;
1304  break;
1305  }
1306  }
1307  if (found == 0) {
1308  return (0);
1309  }
1310  found = 0;
1311  }
1312  return (1);
1313 }
1314 
1315 /*
1316  * Is subset fully contained in fullset
1317  */
1318 static int memb_set_subset (
1319  const struct srp_addr *subset, int subset_entries,
1320  const struct srp_addr *fullset, int fullset_entries)
1321 {
1322  int i;
1323  int j;
1324  int found = 0;
1325 
1326  if (subset_entries > fullset_entries) {
1327  return (0);
1328  }
1329  for (i = 0; i < subset_entries; i++) {
1330  for (j = 0; j < fullset_entries; j++) {
1331  if (srp_addr_equal (&subset[i], &fullset[j])) {
1332  found = 1;
1333  }
1334  }
1335  if (found == 0) {
1336  return (0);
1337  }
1338  found = 0;
1339  }
1340  return (1);
1341 }
1342 /*
1343  * merge subset into fullset taking care not to add duplicates
1344  */
1345 static void memb_set_merge (
1346  const struct srp_addr *subset, int subset_entries,
1347  struct srp_addr *fullset, int *fullset_entries)
1348 {
1349  int found = 0;
1350  int i;
1351  int j;
1352 
1353  for (i = 0; i < subset_entries; i++) {
1354  for (j = 0; j < *fullset_entries; j++) {
1355  if (srp_addr_equal (&fullset[j], &subset[i])) {
1356  found = 1;
1357  break;
1358  }
1359  }
1360  if (found == 0) {
1361  srp_addr_copy (&fullset[*fullset_entries], &subset[i]);
1362  *fullset_entries = *fullset_entries + 1;
1363  }
1364  found = 0;
1365  }
1366  return;
1367 }
1368 
1369 static void memb_set_and_with_ring_id (
1370  struct srp_addr *set1,
1371  struct memb_ring_id *set1_ring_ids,
1372  int set1_entries,
1373  struct srp_addr *set2,
1374  int set2_entries,
1375  struct memb_ring_id *old_ring_id,
1376  struct srp_addr *and,
1377  int *and_entries)
1378 {
1379  int i;
1380  int j;
1381  int found = 0;
1382 
1383  *and_entries = 0;
1384 
1385  for (i = 0; i < set2_entries; i++) {
1386  for (j = 0; j < set1_entries; j++) {
1387  if (srp_addr_equal (&set1[j], &set2[i])) {
1388  if (memcmp (&set1_ring_ids[j], old_ring_id, sizeof (struct memb_ring_id)) == 0) {
1389  found = 1;
1390  }
1391  break;
1392  }
1393  }
1394  if (found) {
1395  srp_addr_copy (&and[*and_entries], &set1[j]);
1396  *and_entries = *and_entries + 1;
1397  }
1398  found = 0;
1399  }
1400  return;
1401 }
1402 
1403 static void memb_set_log(
1404  struct totemsrp_instance *instance,
1405  int level,
1406  const char *string,
1407  struct srp_addr *list,
1408  int list_entries)
1409 {
1410  char int_buf[32];
1411  char list_str[512];
1412  int i;
1413 
1414  memset(list_str, 0, sizeof(list_str));
1415 
1416  for (i = 0; i < list_entries; i++) {
1417  if (i == 0) {
1418  snprintf(int_buf, sizeof(int_buf), "%u", list[i].nodeid);
1419  } else {
1420  snprintf(int_buf, sizeof(int_buf), ",%u", list[i].nodeid);
1421  }
1422 
1423  if (strlen(list_str) + strlen(int_buf) >= sizeof(list_str)) {
1424  break ;
1425  }
1426  strcat(list_str, int_buf);
1427  }
1428 
1429  log_printf(level, "List '%s' contains %d entries: %s", string, list_entries, list_str);
1430 }
1431 
1432 static void my_leave_memb_clear(
1433  struct totemsrp_instance *instance)
1434 {
1435  memset(instance->my_leave_memb_list, 0, sizeof(instance->my_leave_memb_list));
1436  instance->my_leave_memb_entries = 0;
1437 }
1438 
1439 static unsigned int my_leave_memb_match(
1440  struct totemsrp_instance *instance,
1441  unsigned int nodeid)
1442 {
1443  int i;
1444  unsigned int ret = 0;
1445 
1446  for (i = 0; i < instance->my_leave_memb_entries; i++){
1447  if (instance->my_leave_memb_list[i] == nodeid){
1448  ret = nodeid;
1449  break;
1450  }
1451  }
1452  return ret;
1453 }
1454 
1455 static void my_leave_memb_set(
1456  struct totemsrp_instance *instance,
1457  unsigned int nodeid)
1458 {
1459  int i, found = 0;
1460  for (i = 0; i < instance->my_leave_memb_entries; i++){
1461  if (instance->my_leave_memb_list[i] == nodeid){
1462  found = 1;
1463  break;
1464  }
1465  }
1466  if (found == 1) {
1467  return;
1468  }
1469  if (instance->my_leave_memb_entries < (PROCESSOR_COUNT_MAX - 1)) {
1470  instance->my_leave_memb_list[instance->my_leave_memb_entries] = nodeid;
1471  instance->my_leave_memb_entries++;
1472  } else {
1474  "Cannot set LEAVE nodeid=%d", nodeid);
1475  }
1476 }
1477 
1478 
1479 static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance)
1480 {
1481  assert (instance != NULL);
1482  return totemnet_buffer_alloc (instance->totemnet_context);
1483 }
1484 
1485 static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr)
1486 {
1487  assert (instance != NULL);
1488  totemnet_buffer_release (instance->totemnet_context, ptr);
1489 }
1490 
1491 static void reset_token_retransmit_timeout (struct totemsrp_instance *instance)
1492 {
1493  int32_t res;
1494 
1495  qb_loop_timer_del (instance->totemsrp_poll_handle,
1497  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1498  QB_LOOP_MED,
1499  instance->totem_config->token_retransmit_timeout*QB_TIME_NS_IN_MSEC,
1500  (void *)instance,
1501  timer_function_token_retransmit_timeout,
1503  if (res != 0) {
1504  log_printf(instance->totemsrp_log_level_error, "reset_token_retransmit_timeout - qb_loop_timer_add error : %d", res);
1505  }
1506 
1507 }
1508 
1509 static void start_merge_detect_timeout (struct totemsrp_instance *instance)
1510 {
1511  int32_t res;
1512 
1513  if (instance->my_merge_detect_timeout_outstanding == 0) {
1514  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1515  QB_LOOP_MED,
1516  instance->totem_config->merge_timeout*QB_TIME_NS_IN_MSEC,
1517  (void *)instance,
1518  timer_function_merge_detect_timeout,
1519  &instance->timer_merge_detect_timeout);
1520  if (res != 0) {
1521  log_printf(instance->totemsrp_log_level_error, "start_merge_detect_timeout - qb_loop_timer_add error : %d", res);
1522  }
1523 
1524  instance->my_merge_detect_timeout_outstanding = 1;
1525  }
1526 }
1527 
1528 static void cancel_merge_detect_timeout (struct totemsrp_instance *instance)
1529 {
1530  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_merge_detect_timeout);
1532 }
1533 
1534 /*
1535  * ring_state_* is used to save and restore the sort queue
1536  * state when a recovery operation fails (and enters gather)
1537  */
1538 static void old_ring_state_save (struct totemsrp_instance *instance)
1539 {
1540  if (instance->old_ring_state_saved == 0) {
1541  instance->old_ring_state_saved = 1;
1542  memcpy (&instance->my_old_ring_id, &instance->my_ring_id,
1543  sizeof (struct memb_ring_id));
1544  instance->old_ring_state_aru = instance->my_aru;
1547  "Saving state aru %x high seq received %x",
1548  instance->my_aru, instance->my_high_seq_received);
1549  }
1550 }
1551 
1552 static void old_ring_state_restore (struct totemsrp_instance *instance)
1553 {
1554  instance->my_aru = instance->old_ring_state_aru;
1557  "Restoring instance->my_aru %x my high seq received %x",
1558  instance->my_aru, instance->my_high_seq_received);
1559 }
1560 
1561 static void old_ring_state_reset (struct totemsrp_instance *instance)
1562 {
1564  "Resetting old ring state");
1565  instance->old_ring_state_saved = 0;
1566 }
1567 
1568 static void reset_pause_timeout (struct totemsrp_instance *instance)
1569 {
1570  int32_t res;
1571 
1572  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_pause_timeout);
1573  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1574  QB_LOOP_MED,
1575  instance->totem_config->token_timeout * QB_TIME_NS_IN_MSEC / 5,
1576  (void *)instance,
1577  timer_function_pause_timeout,
1578  &instance->timer_pause_timeout);
1579  if (res != 0) {
1580  log_printf(instance->totemsrp_log_level_error, "reset_pause_timeout - qb_loop_timer_add error : %d", res);
1581  }
1582 }
1583 
1584 static void reset_token_warning (struct totemsrp_instance *instance) {
1585  int32_t res;
1586 
1587  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_warning);
1588  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1589  QB_LOOP_MED,
1590  instance->totem_config->token_warning * instance->totem_config->token_timeout / 100 * QB_TIME_NS_IN_MSEC,
1591  (void *)instance,
1592  timer_function_orf_token_warning,
1593  &instance->timer_orf_token_warning);
1594  if (res != 0) {
1595  log_printf(instance->totemsrp_log_level_error, "reset_token_warning - qb_loop_timer_add error : %d", res);
1596  }
1597 }
1598 
1599 static void reset_token_timeout (struct totemsrp_instance *instance) {
1600  int32_t res;
1601 
1602  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_timeout);
1603  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1604  QB_LOOP_MED,
1605  instance->totem_config->token_timeout*QB_TIME_NS_IN_MSEC,
1606  (void *)instance,
1607  timer_function_orf_token_timeout,
1608  &instance->timer_orf_token_timeout);
1609  if (res != 0) {
1610  log_printf(instance->totemsrp_log_level_error, "reset_token_timeout - qb_loop_timer_add error : %d", res);
1611  }
1612 
1613  if (instance->totem_config->token_warning)
1614  reset_token_warning(instance);
1615 }
1616 
1617 static void reset_heartbeat_timeout (struct totemsrp_instance *instance) {
1618  int32_t res;
1619 
1620  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_heartbeat_timeout);
1621  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1622  QB_LOOP_MED,
1623  instance->heartbeat_timeout*QB_TIME_NS_IN_MSEC,
1624  (void *)instance,
1625  timer_function_heartbeat_timeout,
1626  &instance->timer_heartbeat_timeout);
1627  if (res != 0) {
1628  log_printf(instance->totemsrp_log_level_error, "reset_heartbeat_timeout - qb_loop_timer_add error : %d", res);
1629  }
1630 }
1631 
1632 
1633 static void cancel_token_warning (struct totemsrp_instance *instance) {
1634  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_warning);
1635 }
1636 
1637 static void cancel_token_timeout (struct totemsrp_instance *instance) {
1638  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_timeout);
1639 
1640  if (instance->totem_config->token_warning)
1641  cancel_token_warning(instance);
1642 }
1643 
1644 static void cancel_heartbeat_timeout (struct totemsrp_instance *instance) {
1645  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_heartbeat_timeout);
1646 }
1647 
1648 static void cancel_token_retransmit_timeout (struct totemsrp_instance *instance)
1649 {
1650  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_retransmit_timeout);
1651 }
1652 
1653 static void start_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
1654 {
1655  int32_t res;
1656 
1657  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1658  QB_LOOP_MED,
1659  instance->totem_config->token_hold_timeout*QB_TIME_NS_IN_MSEC,
1660  (void *)instance,
1661  timer_function_token_hold_retransmit_timeout,
1663  if (res != 0) {
1664  log_printf(instance->totemsrp_log_level_error, "start_token_hold_retransmit_timeout - qb_loop_timer_add error : %d", res);
1665  }
1666 }
1667 
1668 static void cancel_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
1669 {
1670  qb_loop_timer_del (instance->totemsrp_poll_handle,
1672 }
1673 
1674 static void memb_state_consensus_timeout_expired (
1675  struct totemsrp_instance *instance)
1676 {
1677  struct srp_addr no_consensus_list[PROCESSOR_COUNT_MAX];
1678  int no_consensus_list_entries;
1679 
1680  instance->stats.consensus_timeouts++;
1681  if (memb_consensus_agreed (instance)) {
1682  memb_consensus_reset (instance);
1683 
1684  memb_consensus_set (instance, &instance->my_id);
1685 
1686  reset_token_timeout (instance); // REVIEWED
1687  } else {
1688  memb_consensus_notset (
1689  instance,
1690  no_consensus_list,
1691  &no_consensus_list_entries,
1692  instance->my_proc_list,
1693  instance->my_proc_list_entries);
1694 
1695  memb_set_merge (no_consensus_list, no_consensus_list_entries,
1696  instance->my_failed_list, &instance->my_failed_list_entries);
1697  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT);
1698  }
1699 }
1700 
1701 static void memb_join_message_send (struct totemsrp_instance *instance);
1702 
1703 static void memb_merge_detect_transmit (struct totemsrp_instance *instance);
1704 
1705 /*
1706  * Timers used for various states of the membership algorithm
1707  */
1708 static void timer_function_pause_timeout (void *data)
1709 {
1710  struct totemsrp_instance *instance = data;
1711 
1712  instance->pause_timestamp = qb_util_nano_current_get ();
1713  reset_pause_timeout (instance);
1714 }
1715 
1716 static void memb_recovery_state_token_loss (struct totemsrp_instance *instance)
1717 {
1718  old_ring_state_restore (instance);
1719  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE);
1720  instance->stats.recovery_token_lost++;
1721 }
1722 
1723 static void timer_function_orf_token_warning (void *data)
1724 {
1725  struct totemsrp_instance *instance = data;
1726  uint64_t tv_diff;
1727 
1728  /* need to protect against the case where token_warning is set to 0 dynamically */
1729  if (instance->totem_config->token_warning) {
1730  tv_diff = qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC -
1731  instance->stats.token[instance->stats.latest_token].rx;
1733  "Token has not been received in %d ms ", (unsigned int) tv_diff);
1734  reset_token_warning(instance);
1735  } else {
1736  cancel_token_warning(instance);
1737  }
1738 }
1739 
1740 static void timer_function_orf_token_timeout (void *data)
1741 {
1742  struct totemsrp_instance *instance = data;
1743 
1744  switch (instance->memb_state) {
1747  "The token was lost in the OPERATIONAL state.");
1749  "A processor failed, forming new configuration.");
1751  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE);
1752  instance->stats.operational_token_lost++;
1753  break;
1754 
1755  case MEMB_STATE_GATHER:
1757  "The consensus timeout expired.");
1758  memb_state_consensus_timeout_expired (instance);
1759  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED);
1760  instance->stats.gather_token_lost++;
1761  break;
1762 
1763  case MEMB_STATE_COMMIT:
1765  "The token was lost in the COMMIT state.");
1766  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE);
1767  instance->stats.commit_token_lost++;
1768  break;
1769 
1770  case MEMB_STATE_RECOVERY:
1772  "The token was lost in the RECOVERY state.");
1773  memb_recovery_state_token_loss (instance);
1774  instance->orf_token_discard = 1;
1775  break;
1776  }
1777 }
1778 
1779 static void timer_function_heartbeat_timeout (void *data)
1780 {
1781  struct totemsrp_instance *instance = data;
1783  "HeartBeat Timer expired Invoking token loss mechanism in state %d ", instance->memb_state);
1784  timer_function_orf_token_timeout(data);
1785 }
1786 
1787 static void memb_timer_function_state_gather (void *data)
1788 {
1789  struct totemsrp_instance *instance = data;
1790  int32_t res;
1791 
1792  switch (instance->memb_state) {
1794  case MEMB_STATE_RECOVERY:
1795  assert (0); /* this should never happen */
1796  break;
1797  case MEMB_STATE_GATHER:
1798  case MEMB_STATE_COMMIT:
1799  memb_join_message_send (instance);
1800 
1801  /*
1802  * Restart the join timeout
1803  `*/
1804  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
1805 
1806  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1807  QB_LOOP_MED,
1808  instance->totem_config->join_timeout*QB_TIME_NS_IN_MSEC,
1809  (void *)instance,
1810  memb_timer_function_state_gather,
1811  &instance->memb_timer_state_gather_join_timeout);
1812 
1813  if (res != 0) {
1814  log_printf(instance->totemsrp_log_level_error, "memb_timer_function_state_gather - qb_loop_timer_add error : %d", res);
1815  }
1816  break;
1817  }
1818 }
1819 
1820 static void memb_timer_function_gather_consensus_timeout (void *data)
1821 {
1822  struct totemsrp_instance *instance = data;
1823  memb_state_consensus_timeout_expired (instance);
1824 }
1825 
1826 static void deliver_messages_from_recovery_to_regular (struct totemsrp_instance *instance)
1827 {
1828  unsigned int i;
1829  struct sort_queue_item *recovery_message_item;
1830  struct sort_queue_item regular_message_item;
1831  unsigned int range = 0;
1832  int res;
1833  void *ptr;
1834  struct mcast *mcast;
1835 
1837  "recovery to regular %x-%x", SEQNO_START_MSG + 1, instance->my_aru);
1838 
1839  range = instance->my_aru - SEQNO_START_MSG;
1840  /*
1841  * Move messages from recovery to regular sort queue
1842  */
1843 // todo should i be initialized to 0 or 1 ?
1844  for (i = 1; i <= range; i++) {
1845  res = sq_item_get (&instance->recovery_sort_queue,
1846  i + SEQNO_START_MSG, &ptr);
1847  if (res != 0) {
1848  continue;
1849  }
1850  recovery_message_item = ptr;
1851 
1852  /*
1853  * Convert recovery message into regular message
1854  */
1855  mcast = recovery_message_item->mcast;
1856  if (mcast->header.encapsulated == MESSAGE_ENCAPSULATED) {
1857  /*
1858  * Message is a recovery message encapsulated
1859  * in a new ring message
1860  */
1861  regular_message_item.mcast =
1862  (struct mcast *)(((char *)recovery_message_item->mcast) + sizeof (struct mcast));
1863  regular_message_item.msg_len =
1864  recovery_message_item->msg_len - sizeof (struct mcast);
1865  mcast = regular_message_item.mcast;
1866  } else {
1867  /*
1868  * TODO this case shouldn't happen
1869  */
1870  continue;
1871  }
1872 
1874  "comparing if ring id is for this processors old ring seqno %d",
1875  mcast->seq);
1876 
1877  /*
1878  * Only add this message to the regular sort
1879  * queue if it was originated with the same ring
1880  * id as the previous ring
1881  */
1882  if (memcmp (&instance->my_old_ring_id, &mcast->ring_id,
1883  sizeof (struct memb_ring_id)) == 0) {
1884 
1885  res = sq_item_inuse (&instance->regular_sort_queue, mcast->seq);
1886  if (res == 0) {
1887  sq_item_add (&instance->regular_sort_queue,
1888  &regular_message_item, mcast->seq);
1889  if (sq_lt_compare (instance->old_ring_state_high_seq_received, mcast->seq)) {
1890  instance->old_ring_state_high_seq_received = mcast->seq;
1891  }
1892  }
1893  } else {
1895  "-not adding msg with seq no %x", mcast->seq);
1896  }
1897  }
1898 }
1899 
1900 /*
1901  * Change states in the state machine of the membership algorithm
1902  */
1903 static void memb_state_operational_enter (struct totemsrp_instance *instance)
1904 {
1905  struct srp_addr joined_list[PROCESSOR_COUNT_MAX];
1906  int joined_list_entries = 0;
1907  unsigned int aru_save;
1908  unsigned int joined_list_totemip[PROCESSOR_COUNT_MAX];
1909  unsigned int trans_memb_list_totemip[PROCESSOR_COUNT_MAX];
1910  unsigned int new_memb_list_totemip[PROCESSOR_COUNT_MAX];
1911  unsigned int left_list[PROCESSOR_COUNT_MAX];
1912  unsigned int i;
1913  unsigned int res;
1914  char left_node_msg[1024];
1915  char joined_node_msg[1024];
1916  char failed_node_msg[1024];
1917 
1918  instance->originated_orf_token = 0;
1919 
1920  memb_consensus_reset (instance);
1921 
1922  old_ring_state_reset (instance);
1923 
1924  deliver_messages_from_recovery_to_regular (instance);
1925 
1927  "Delivering to app %x to %x",
1928  instance->my_high_delivered + 1, instance->old_ring_state_high_seq_received);
1929 
1930  aru_save = instance->my_aru;
1931  instance->my_aru = instance->old_ring_state_aru;
1932 
1933  messages_deliver_to_app (instance, 0, instance->old_ring_state_high_seq_received);
1934 
1935  /*
1936  * Calculate joined and left list
1937  */
1938  memb_set_subtract (instance->my_left_memb_list,
1939  &instance->my_left_memb_entries,
1940  instance->my_memb_list, instance->my_memb_entries,
1941  instance->my_trans_memb_list, instance->my_trans_memb_entries);
1942 
1943  memb_set_subtract (joined_list, &joined_list_entries,
1944  instance->my_new_memb_list, instance->my_new_memb_entries,
1945  instance->my_trans_memb_list, instance->my_trans_memb_entries);
1946 
1947  /*
1948  * Install new membership
1949  */
1950  instance->my_memb_entries = instance->my_new_memb_entries;
1951  memcpy (&instance->my_memb_list, instance->my_new_memb_list,
1952  sizeof (struct srp_addr) * instance->my_memb_entries);
1953  instance->last_released = 0;
1954  instance->my_set_retrans_flg = 0;
1955 
1956  /*
1957  * Deliver transitional configuration to application
1958  */
1959  srp_addr_to_nodeid (instance, left_list, instance->my_left_memb_list,
1960  instance->my_left_memb_entries);
1961  srp_addr_to_nodeid (instance, trans_memb_list_totemip,
1962  instance->my_trans_memb_list, instance->my_trans_memb_entries);
1964  trans_memb_list_totemip, instance->my_trans_memb_entries,
1965  left_list, instance->my_left_memb_entries,
1966  0, 0, &instance->my_ring_id);
1967  instance->waiting_trans_ack = 1;
1968  instance->totemsrp_waiting_trans_ack_cb_fn (1);
1969 
1970 // TODO we need to filter to ensure we only deliver those
1971 // messages which are part of instance->my_deliver_memb
1972  messages_deliver_to_app (instance, 1, instance->old_ring_state_high_seq_received);
1973 
1974  instance->my_aru = aru_save;
1975 
1976  /*
1977  * Deliver regular configuration to application
1978  */
1979  srp_addr_to_nodeid (instance, new_memb_list_totemip,
1980  instance->my_new_memb_list, instance->my_new_memb_entries);
1981  srp_addr_to_nodeid (instance, joined_list_totemip, joined_list,
1982  joined_list_entries);
1984  new_memb_list_totemip, instance->my_new_memb_entries,
1985  0, 0,
1986  joined_list_totemip, joined_list_entries, &instance->my_ring_id);
1987 
1988  /*
1989  * The recovery sort queue now becomes the regular
1990  * sort queue. It is necessary to copy the state
1991  * into the regular sort queue.
1992  */
1993  sq_copy (&instance->regular_sort_queue, &instance->recovery_sort_queue);
1994  instance->my_last_aru = SEQNO_START_MSG;
1995 
1996  /* When making my_proc_list smaller, ensure that the
1997  * now non-used entries are zero-ed out. There are some suspect
1998  * assert's that assume that there is always 2 entries in the list.
1999  * These fail when my_proc_list is reduced to 1 entry (and the
2000  * valid [0] entry is the same as the 'unused' [1] entry).
2001  */
2002  memset(instance->my_proc_list, 0,
2003  sizeof (struct srp_addr) * instance->my_proc_list_entries);
2004 
2005  instance->my_proc_list_entries = instance->my_new_memb_entries;
2006  memcpy (instance->my_proc_list, instance->my_new_memb_list,
2007  sizeof (struct srp_addr) * instance->my_memb_entries);
2008 
2009  instance->my_failed_list_entries = 0;
2010  /*
2011  * TODO Not exactly to spec
2012  *
2013  * At the entry to this function all messages without a gap are
2014  * deliered.
2015  *
2016  * This code throw away messages from the last gap in the sort queue
2017  * to my_high_seq_received
2018  *
2019  * What should really happen is we should deliver all messages up to
2020  * a gap, then delier the transitional configuration, then deliver
2021  * the messages between the first gap and my_high_seq_received, then
2022  * deliver a regular configuration, then deliver the regular
2023  * configuration
2024  *
2025  * Unfortunately totempg doesn't appear to like this operating mode
2026  * which needs more inspection
2027  */
2028  i = instance->my_high_seq_received + 1;
2029  do {
2030  void *ptr;
2031 
2032  i -= 1;
2033  res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
2034  if (i == 0) {
2035  break;
2036  }
2037  } while (res);
2038 
2039  instance->my_high_delivered = i;
2040 
2041  for (i = 0; i <= instance->my_high_delivered; i++) {
2042  void *ptr;
2043 
2044  res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
2045  if (res == 0) {
2046  struct sort_queue_item *regular_message;
2047 
2048  regular_message = ptr;
2049  free (regular_message->mcast);
2050  }
2051  }
2052  sq_items_release (&instance->regular_sort_queue, instance->my_high_delivered);
2053  instance->last_released = instance->my_high_delivered;
2054 
2055  if (joined_list_entries) {
2056  int sptr = 0;
2057  sptr += snprintf(joined_node_msg, sizeof(joined_node_msg)-sptr, " joined:");
2058  for (i=0; i< joined_list_entries; i++) {
2059  sptr += snprintf(joined_node_msg+sptr, sizeof(joined_node_msg)-sptr, " %u", joined_list_totemip[i]);
2060  }
2061  }
2062  else {
2063  joined_node_msg[0] = '\0';
2064  }
2065 
2066  if (instance->my_left_memb_entries) {
2067  int sptr = 0;
2068  int sptr2 = 0;
2069  sptr += snprintf(left_node_msg, sizeof(left_node_msg)-sptr, " left:");
2070  for (i=0; i< instance->my_left_memb_entries; i++) {
2071  sptr += snprintf(left_node_msg+sptr, sizeof(left_node_msg)-sptr, " %u", left_list[i]);
2072  }
2073  for (i=0; i< instance->my_left_memb_entries; i++) {
2074  if (my_leave_memb_match(instance, left_list[i]) == 0) {
2075  if (sptr2 == 0) {
2076  sptr2 += snprintf(failed_node_msg, sizeof(failed_node_msg)-sptr2, " failed:");
2077  }
2078  sptr2 += snprintf(failed_node_msg+sptr2, sizeof(left_node_msg)-sptr2, " %u", left_list[i]);
2079  }
2080  }
2081  if (sptr2 == 0) {
2082  failed_node_msg[0] = '\0';
2083  }
2084  }
2085  else {
2086  left_node_msg[0] = '\0';
2087  failed_node_msg[0] = '\0';
2088  }
2089 
2090  my_leave_memb_clear(instance);
2091 
2093  "entering OPERATIONAL state.");
2095  "A new membership (%u:%lld) was formed. Members%s%s",
2096  instance->my_ring_id.rep,
2097  instance->my_ring_id.seq,
2098  joined_node_msg,
2099  left_node_msg);
2100 
2101  if (strlen(failed_node_msg)) {
2103  "Failed to receive the leave message.%s",
2104  failed_node_msg);
2105  }
2106 
2107  instance->memb_state = MEMB_STATE_OPERATIONAL;
2108 
2109  instance->stats.operational_entered++;
2110  instance->stats.continuous_gather = 0;
2111 
2112  instance->my_received_flg = 1;
2113 
2114  reset_pause_timeout (instance);
2115 
2116  /*
2117  * Save ring id information from this configuration to determine
2118  * which processors are transitioning from old regular configuration
2119  * in to new regular configuration on the next configuration change
2120  */
2121  memcpy (&instance->my_old_ring_id, &instance->my_ring_id,
2122  sizeof (struct memb_ring_id));
2123 
2124  return;
2125 }
2126 
2127 static void memb_state_gather_enter (
2128  struct totemsrp_instance *instance,
2129  enum gather_state_from gather_from)
2130 {
2131  int32_t res;
2132 
2133  instance->orf_token_discard = 1;
2134 
2135  instance->originated_orf_token = 0;
2136 
2137  memb_set_merge (
2138  &instance->my_id, 1,
2139  instance->my_proc_list, &instance->my_proc_list_entries);
2140 
2141  memb_join_message_send (instance);
2142 
2143  /*
2144  * Restart the join timeout
2145  */
2146  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
2147 
2148  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
2149  QB_LOOP_MED,
2150  instance->totem_config->join_timeout*QB_TIME_NS_IN_MSEC,
2151  (void *)instance,
2152  memb_timer_function_state_gather,
2153  &instance->memb_timer_state_gather_join_timeout);
2154  if (res != 0) {
2155  log_printf(instance->totemsrp_log_level_error, "memb_state_gather_enter - qb_loop_timer_add error(1) : %d", res);
2156  }
2157 
2158  /*
2159  * Restart the consensus timeout
2160  */
2161  qb_loop_timer_del (instance->totemsrp_poll_handle,
2162  instance->memb_timer_state_gather_consensus_timeout);
2163 
2164  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
2165  QB_LOOP_MED,
2166  instance->totem_config->consensus_timeout*QB_TIME_NS_IN_MSEC,
2167  (void *)instance,
2168  memb_timer_function_gather_consensus_timeout,
2169  &instance->memb_timer_state_gather_consensus_timeout);
2170  if (res != 0) {
2171  log_printf(instance->totemsrp_log_level_error, "memb_state_gather_enter - qb_loop_timer_add error(2) : %d", res);
2172  }
2173 
2174  /*
2175  * Cancel the token loss and token retransmission timeouts
2176  */
2177  cancel_token_retransmit_timeout (instance); // REVIEWED
2178  cancel_token_timeout (instance); // REVIEWED
2179  cancel_merge_detect_timeout (instance);
2180 
2181  memb_consensus_reset (instance);
2182 
2183  memb_consensus_set (instance, &instance->my_id);
2184 
2185  log_printf (instance->totemsrp_log_level_debug,
2186  "entering GATHER state from %d(%s).",
2187  gather_from, gsfrom_to_msg(gather_from));
2188 
2189  instance->memb_state = MEMB_STATE_GATHER;
2190  instance->stats.gather_entered++;
2191 
2192  if (gather_from == TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED) {
2193  /*
2194  * State 3 means gather, so we are continuously gathering.
2195  */
2196  instance->stats.continuous_gather++;
2197  }
2198 
2199  return;
2200 }
2201 
2202 static void timer_function_token_retransmit_timeout (void *data);
2203 
2204 static void target_set_completed (
2205  void *context)
2206 {
2207  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
2208 
2209  memb_state_commit_token_send (instance);
2210 
2211 }
2212 
2213 static void memb_state_commit_enter (
2214  struct totemsrp_instance *instance)
2215 {
2216  old_ring_state_save (instance);
2217 
2218  memb_state_commit_token_update (instance);
2219 
2220  memb_state_commit_token_target_set (instance);
2221 
2222  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
2223 
2225 
2226  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_consensus_timeout);
2227 
2229 
2230  memb_ring_id_set (instance, &instance->commit_token->ring_id);
2231 
2232  instance->memb_ring_id_store (&instance->my_ring_id, instance->my_id.nodeid);
2233 
2234  instance->token_ring_id_seq = instance->my_ring_id.seq;
2235 
2237  "entering COMMIT state.");
2238 
2239  instance->memb_state = MEMB_STATE_COMMIT;
2240  reset_token_retransmit_timeout (instance); // REVIEWED
2241  reset_token_timeout (instance); // REVIEWED
2242 
2243  instance->stats.commit_entered++;
2244  instance->stats.continuous_gather = 0;
2245 
2246  /*
2247  * reset all flow control variables since we are starting a new ring
2248  */
2249  instance->my_trc = 0;
2250  instance->my_pbl = 0;
2251  instance->my_cbl = 0;
2252  /*
2253  * commit token sent after callback that token target has been set
2254  */
2255 }
2256 
2257 static void memb_state_recovery_enter (
2258  struct totemsrp_instance *instance,
2259  struct memb_commit_token *commit_token)
2260 {
2261  int i;
2262  int local_received_flg = 1;
2263  unsigned int low_ring_aru;
2264  unsigned int range = 0;
2265  unsigned int messages_originated = 0;
2266  const struct srp_addr *addr;
2267  struct memb_commit_token_memb_entry *memb_list;
2268  struct memb_ring_id my_new_memb_ring_id_list[PROCESSOR_COUNT_MAX];
2269 
2270  addr = (const struct srp_addr *)commit_token->end_of_commit_token;
2271  memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries);
2272 
2274  "entering RECOVERY state.");
2275 
2276  instance->orf_token_discard = 0;
2277 
2278  instance->my_high_ring_delivered = 0;
2279 
2280  sq_reinit (&instance->recovery_sort_queue, SEQNO_START_MSG);
2281  cs_queue_reinit (&instance->retrans_message_queue);
2282 
2283  low_ring_aru = instance->old_ring_state_high_seq_received;
2284 
2285  memb_state_commit_token_send_recovery (instance, commit_token);
2286 
2287  instance->my_token_seq = SEQNO_START_TOKEN - 1;
2288 
2289  /*
2290  * Build regular configuration
2291  */
2293  instance->totemnet_context,
2294  commit_token->addr_entries);
2295 
2296  /*
2297  * Build transitional configuration
2298  */
2299  for (i = 0; i < instance->my_new_memb_entries; i++) {
2300  memcpy (&my_new_memb_ring_id_list[i],
2301  &memb_list[i].ring_id,
2302  sizeof (struct memb_ring_id));
2303  }
2304  memb_set_and_with_ring_id (
2305  instance->my_new_memb_list,
2306  my_new_memb_ring_id_list,
2307  instance->my_new_memb_entries,
2308  instance->my_memb_list,
2309  instance->my_memb_entries,
2310  &instance->my_old_ring_id,
2311  instance->my_trans_memb_list,
2312  &instance->my_trans_memb_entries);
2313 
2314  for (i = 0; i < instance->my_trans_memb_entries; i++) {
2316  "TRANS [%d] member %u:", i, instance->my_trans_memb_list[i].nodeid);
2317  }
2318  for (i = 0; i < instance->my_new_memb_entries; i++) {
2320  "position [%d] member %u:", i, addr[i].nodeid);
2322  "previous ring seq %llx rep %u",
2323  memb_list[i].ring_id.seq,
2324  memb_list[i].ring_id.rep);
2325 
2327  "aru %x high delivered %x received flag %d",
2328  memb_list[i].aru,
2329  memb_list[i].high_delivered,
2330  memb_list[i].received_flg);
2331 
2332  // assert (totemip_print (&memb_list[i].ring_id.rep) != 0);
2333  }
2334  /*
2335  * Determine if any received flag is false
2336  */
2337  for (i = 0; i < commit_token->addr_entries; i++) {
2338  if (memb_set_subset (&instance->my_new_memb_list[i], 1,
2339  instance->my_trans_memb_list, instance->my_trans_memb_entries) &&
2340 
2341  memb_list[i].received_flg == 0) {
2342  instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
2343  memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
2344  sizeof (struct srp_addr) * instance->my_trans_memb_entries);
2345  local_received_flg = 0;
2346  break;
2347  }
2348  }
2349  if (local_received_flg == 1) {
2350  goto no_originate;
2351  } /* Else originate messages if we should */
2352 
2353  /*
2354  * Calculate my_low_ring_aru, instance->my_high_ring_delivered for the transitional membership
2355  */
2356  for (i = 0; i < commit_token->addr_entries; i++) {
2357  if (memb_set_subset (&instance->my_new_memb_list[i], 1,
2358  instance->my_deliver_memb_list,
2359  instance->my_deliver_memb_entries) &&
2360 
2361  memcmp (&instance->my_old_ring_id,
2362  &memb_list[i].ring_id,
2363  sizeof (struct memb_ring_id)) == 0) {
2364 
2365  if (sq_lt_compare (memb_list[i].aru, low_ring_aru)) {
2366 
2367  low_ring_aru = memb_list[i].aru;
2368  }
2369  if (sq_lt_compare (instance->my_high_ring_delivered, memb_list[i].high_delivered)) {
2370  instance->my_high_ring_delivered = memb_list[i].high_delivered;
2371  }
2372  }
2373  }
2374 
2375  /*
2376  * Copy all old ring messages to instance->retrans_message_queue
2377  */
2378  range = instance->old_ring_state_high_seq_received - low_ring_aru;
2379  if (range == 0) {
2380  /*
2381  * No messages to copy
2382  */
2383  goto no_originate;
2384  }
2385  assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2386 
2388  "copying all old ring messages from %x-%x.",
2389  low_ring_aru + 1, instance->old_ring_state_high_seq_received);
2390 
2391  for (i = 1; i <= range; i++) {
2393  struct message_item message_item;
2394  void *ptr;
2395  int res;
2396 
2397  res = sq_item_get (&instance->regular_sort_queue,
2398  low_ring_aru + i, &ptr);
2399  if (res != 0) {
2400  continue;
2401  }
2402  sort_queue_item = ptr;
2403  messages_originated++;
2404  memset (&message_item, 0, sizeof (struct message_item));
2405  // TODO LEAK
2406  message_item.mcast = totemsrp_buffer_alloc (instance);
2407  assert (message_item.mcast);
2408  message_item.mcast->header.magic = TOTEM_MH_MAGIC;
2409  message_item.mcast->header.version = TOTEM_MH_VERSION;
2410  message_item.mcast->header.type = MESSAGE_TYPE_MCAST;
2411  srp_addr_copy (&message_item.mcast->system_from, &instance->my_id);
2413 
2414  message_item.mcast->header.nodeid = instance->my_id.nodeid;
2415  assert (message_item.mcast->header.nodeid);
2416  memcpy (&message_item.mcast->ring_id, &instance->my_ring_id,
2417  sizeof (struct memb_ring_id));
2418  message_item.msg_len = sort_queue_item->msg_len + sizeof (struct mcast);
2419  memcpy (((char *)message_item.mcast) + sizeof (struct mcast),
2420  sort_queue_item->mcast,
2421  sort_queue_item->msg_len);
2422  cs_queue_item_add (&instance->retrans_message_queue, &message_item);
2423  }
2425  "Originated %d messages in RECOVERY.", messages_originated);
2426  goto originated;
2427 
2428 no_originate:
2430  "Did not need to originate any messages in recovery.");
2431 
2432 originated:
2433  instance->my_aru = SEQNO_START_MSG;
2434  instance->my_aru_count = 0;
2435  instance->my_seq_unchanged = 0;
2437  instance->my_install_seq = SEQNO_START_MSG;
2438  instance->last_released = SEQNO_START_MSG;
2439 
2440  reset_token_timeout (instance); // REVIEWED
2441  reset_token_retransmit_timeout (instance); // REVIEWED
2442 
2443  instance->memb_state = MEMB_STATE_RECOVERY;
2444  instance->stats.recovery_entered++;
2445  instance->stats.continuous_gather = 0;
2446 
2447  return;
2448 }
2449 
2450 void totemsrp_event_signal (void *srp_context, enum totem_event_type type, int value)
2451 {
2452  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2453 
2454  token_hold_cancel_send (instance);
2455 
2456  return;
2457 }
2458 
2460  void *srp_context,
2461  struct iovec *iovec,
2462  unsigned int iov_len,
2463  int guarantee)
2464 {
2465  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2466  int i;
2467  struct message_item message_item;
2468  char *addr;
2469  unsigned int addr_idx;
2470  struct cs_queue *queue_use;
2471 
2472  if (instance->waiting_trans_ack) {
2473  queue_use = &instance->new_message_queue_trans;
2474  } else {
2475  queue_use = &instance->new_message_queue;
2476  }
2477 
2478  if (cs_queue_is_full (queue_use)) {
2479  log_printf (instance->totemsrp_log_level_debug, "queue full");
2480  return (-1);
2481  }
2482 
2483  memset (&message_item, 0, sizeof (struct message_item));
2484 
2485  /*
2486  * Allocate pending item
2487  */
2488  message_item.mcast = totemsrp_buffer_alloc (instance);
2489  if (message_item.mcast == 0) {
2490  goto error_mcast;
2491  }
2492 
2493  /*
2494  * Set mcast header
2495  */
2496  memset(message_item.mcast, 0, sizeof (struct mcast));
2497  message_item.mcast->header.magic = TOTEM_MH_MAGIC;
2498  message_item.mcast->header.version = TOTEM_MH_VERSION;
2499  message_item.mcast->header.type = MESSAGE_TYPE_MCAST;
2501 
2502  message_item.mcast->header.nodeid = instance->my_id.nodeid;
2503  assert (message_item.mcast->header.nodeid);
2504 
2505  message_item.mcast->guarantee = guarantee;
2506  srp_addr_copy (&message_item.mcast->system_from, &instance->my_id);
2507 
2508  addr = (char *)message_item.mcast;
2509  addr_idx = sizeof (struct mcast);
2510  for (i = 0; i < iov_len; i++) {
2511  memcpy (&addr[addr_idx], iovec[i].iov_base, iovec[i].iov_len);
2512  addr_idx += iovec[i].iov_len;
2513  }
2514 
2515  message_item.msg_len = addr_idx;
2516 
2517  log_printf (instance->totemsrp_log_level_trace, "mcasted message added to pending queue");
2518  instance->stats.mcast_tx++;
2519  cs_queue_item_add (queue_use, &message_item);
2520 
2521  return (0);
2522 
2523 error_mcast:
2524  return (-1);
2525 }
2526 
2527 /*
2528  * Determine if there is room to queue a new message
2529  */
2530 int totemsrp_avail (void *srp_context)
2531 {
2532  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2533  int avail;
2534  struct cs_queue *queue_use;
2535 
2536  if (instance->waiting_trans_ack) {
2537  queue_use = &instance->new_message_queue_trans;
2538  } else {
2539  queue_use = &instance->new_message_queue;
2540  }
2541  cs_queue_avail (queue_use, &avail);
2542 
2543  return (avail);
2544 }
2545 
2546 /*
2547  * ORF Token Management
2548  */
2549 /*
2550  * Recast message to mcast group if it is available
2551  */
2552 static int orf_token_remcast (
2553  struct totemsrp_instance *instance,
2554  int seq)
2555 {
2557  int res;
2558  void *ptr;
2559 
2560  struct sq *sort_queue;
2561 
2562  if (instance->memb_state == MEMB_STATE_RECOVERY) {
2563  sort_queue = &instance->recovery_sort_queue;
2564  } else {
2565  sort_queue = &instance->regular_sort_queue;
2566  }
2567 
2568  res = sq_in_range (sort_queue, seq);
2569  if (res == 0) {
2570  log_printf (instance->totemsrp_log_level_debug, "sq not in range");
2571  return (-1);
2572  }
2573 
2574  /*
2575  * Get RTR item at seq, if not available, return
2576  */
2577  res = sq_item_get (sort_queue, seq, &ptr);
2578  if (res != 0) {
2579  return -1;
2580  }
2581 
2582  sort_queue_item = ptr;
2583 
2585  instance->totemnet_context,
2586  sort_queue_item->mcast,
2587  sort_queue_item->msg_len);
2588 
2589  return (0);
2590 }
2591 
2592 
2593 /*
2594  * Free all freeable messages from ring
2595  */
2596 static void messages_free (
2597  struct totemsrp_instance *instance,
2598  unsigned int token_aru)
2599 {
2600  struct sort_queue_item *regular_message;
2601  unsigned int i;
2602  int res;
2603  int log_release = 0;
2604  unsigned int release_to;
2605  unsigned int range = 0;
2606 
2607  release_to = token_aru;
2608  if (sq_lt_compare (instance->my_last_aru, release_to)) {
2609  release_to = instance->my_last_aru;
2610  }
2611  if (sq_lt_compare (instance->my_high_delivered, release_to)) {
2612  release_to = instance->my_high_delivered;
2613  }
2614 
2615  /*
2616  * Ensure we dont try release before an already released point
2617  */
2618  if (sq_lt_compare (release_to, instance->last_released)) {
2619  return;
2620  }
2621 
2622  range = release_to - instance->last_released;
2623  assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2624 
2625  /*
2626  * Release retransmit list items if group aru indicates they are transmitted
2627  */
2628  for (i = 1; i <= range; i++) {
2629  void *ptr;
2630 
2631  res = sq_item_get (&instance->regular_sort_queue,
2632  instance->last_released + i, &ptr);
2633  if (res == 0) {
2634  regular_message = ptr;
2635  totemsrp_buffer_release (instance, regular_message->mcast);
2636  }
2637  sq_items_release (&instance->regular_sort_queue,
2638  instance->last_released + i);
2639 
2640  log_release = 1;
2641  }
2642  instance->last_released += range;
2643 
2644  if (log_release) {
2646  "releasing messages up to and including %x", release_to);
2647  }
2648 }
2649 
2650 static void update_aru (
2651  struct totemsrp_instance *instance)
2652 {
2653  unsigned int i;
2654  int res;
2655  struct sq *sort_queue;
2656  unsigned int range;
2657  unsigned int my_aru_saved = 0;
2658 
2659  if (instance->memb_state == MEMB_STATE_RECOVERY) {
2660  sort_queue = &instance->recovery_sort_queue;
2661  } else {
2662  sort_queue = &instance->regular_sort_queue;
2663  }
2664 
2665  range = instance->my_high_seq_received - instance->my_aru;
2666 
2667  my_aru_saved = instance->my_aru;
2668  for (i = 1; i <= range; i++) {
2669 
2670  void *ptr;
2671 
2672  res = sq_item_get (sort_queue, my_aru_saved + i, &ptr);
2673  /*
2674  * If hole, stop updating aru
2675  */
2676  if (res != 0) {
2677  break;
2678  }
2679  }
2680  instance->my_aru += i - 1;
2681 }
2682 
2683 /*
2684  * Multicasts pending messages onto the ring (requires orf_token possession)
2685  */
2686 static int orf_token_mcast (
2687  struct totemsrp_instance *instance,
2688  struct orf_token *token,
2689  int fcc_mcasts_allowed)
2690 {
2691  struct message_item *message_item = 0;
2692  struct cs_queue *mcast_queue;
2693  struct sq *sort_queue;
2694  struct sort_queue_item sort_queue_item;
2695  struct mcast *mcast;
2696  unsigned int fcc_mcast_current;
2697 
2698  if (instance->memb_state == MEMB_STATE_RECOVERY) {
2699  mcast_queue = &instance->retrans_message_queue;
2700  sort_queue = &instance->recovery_sort_queue;
2701  reset_token_retransmit_timeout (instance); // REVIEWED
2702  } else {
2703  if (instance->waiting_trans_ack) {
2704  mcast_queue = &instance->new_message_queue_trans;
2705  } else {
2706  mcast_queue = &instance->new_message_queue;
2707  }
2708 
2709  sort_queue = &instance->regular_sort_queue;
2710  }
2711 
2712  for (fcc_mcast_current = 0; fcc_mcast_current < fcc_mcasts_allowed; fcc_mcast_current++) {
2713  if (cs_queue_is_empty (mcast_queue)) {
2714  break;
2715  }
2716  message_item = (struct message_item *)cs_queue_item_get (mcast_queue);
2717 
2718  message_item->mcast->seq = ++token->seq;
2719  message_item->mcast->this_seqno = instance->global_seqno++;
2720 
2721  /*
2722  * Build IO vector
2723  */
2724  memset (&sort_queue_item, 0, sizeof (struct sort_queue_item));
2725  sort_queue_item.mcast = message_item->mcast;
2726  sort_queue_item.msg_len = message_item->msg_len;
2727 
2728  mcast = sort_queue_item.mcast;
2729 
2730  memcpy (&mcast->ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
2731 
2732  /*
2733  * Add message to retransmit queue
2734  */
2735  sq_item_add (sort_queue, &sort_queue_item, message_item->mcast->seq);
2736 
2738  instance->totemnet_context,
2739  message_item->mcast,
2740  message_item->msg_len);
2741 
2742  /*
2743  * Delete item from pending queue
2744  */
2745  cs_queue_item_remove (mcast_queue);
2746 
2747  /*
2748  * If messages mcasted, deliver any new messages to totempg
2749  */
2750  instance->my_high_seq_received = token->seq;
2751  }
2752 
2753  update_aru (instance);
2754 
2755  /*
2756  * Return 1 if more messages are available for single node clusters
2757  */
2758  return (fcc_mcast_current);
2759 }
2760 
2761 /*
2762  * Remulticasts messages in orf_token's retransmit list (requires orf_token)
2763  * Modify's orf_token's rtr to include retransmits required by this process
2764  */
2765 static int orf_token_rtr (
2766  struct totemsrp_instance *instance,
2767  struct orf_token *orf_token,
2768  unsigned int *fcc_allowed)
2769 {
2770  unsigned int res;
2771  unsigned int i, j;
2772  unsigned int found;
2773  struct sq *sort_queue;
2774  struct rtr_item *rtr_list;
2775  unsigned int range = 0;
2776  char retransmit_msg[1024];
2777  char value[64];
2778 
2779  if (instance->memb_state == MEMB_STATE_RECOVERY) {
2780  sort_queue = &instance->recovery_sort_queue;
2781  } else {
2782  sort_queue = &instance->regular_sort_queue;
2783  }
2784 
2785  rtr_list = &orf_token->rtr_list[0];
2786 
2787  strcpy (retransmit_msg, "Retransmit List: ");
2788  if (orf_token->rtr_list_entries) {
2790  "Retransmit List %d", orf_token->rtr_list_entries);
2791  for (i = 0; i < orf_token->rtr_list_entries; i++) {
2792  sprintf (value, "%x ", rtr_list[i].seq);
2793  strcat (retransmit_msg, value);
2794  }
2795  strcat (retransmit_msg, "");
2797  "%s", retransmit_msg);
2798  }
2799 
2800  /*
2801  * Retransmit messages on orf_token's RTR list from RTR queue
2802  */
2803  for (instance->fcc_remcast_current = 0, i = 0;
2804  instance->fcc_remcast_current < *fcc_allowed && i < orf_token->rtr_list_entries;) {
2805 
2806  /*
2807  * If this retransmit request isn't from this configuration,
2808  * try next rtr entry
2809  */
2810  if (memcmp (&rtr_list[i].ring_id, &instance->my_ring_id,
2811  sizeof (struct memb_ring_id)) != 0) {
2812 
2813  i += 1;
2814  continue;
2815  }
2816 
2817  res = orf_token_remcast (instance, rtr_list[i].seq);
2818  if (res == 0) {
2819  /*
2820  * Multicasted message, so no need to copy to new retransmit list
2821  */
2822  orf_token->rtr_list_entries -= 1;
2823  assert (orf_token->rtr_list_entries >= 0);
2824  memmove (&rtr_list[i], &rtr_list[i + 1],
2825  sizeof (struct rtr_item) * (orf_token->rtr_list_entries - i));
2826 
2827  instance->stats.mcast_retx++;
2828  instance->fcc_remcast_current++;
2829  } else {
2830  i += 1;
2831  }
2832  }
2833  *fcc_allowed = *fcc_allowed - instance->fcc_remcast_current;
2834 
2835  /*
2836  * Add messages to retransmit to RTR list
2837  * but only retry if there is room in the retransmit list
2838  */
2839 
2840  range = orf_token->seq - instance->my_aru;
2841  assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2842 
2843  for (i = 1; (orf_token->rtr_list_entries < RETRANSMIT_ENTRIES_MAX) &&
2844  (i <= range); i++) {
2845 
2846  /*
2847  * Ensure message is within the sort queue range
2848  */
2849  res = sq_in_range (sort_queue, instance->my_aru + i);
2850  if (res == 0) {
2851  break;
2852  }
2853 
2854  /*
2855  * Find if a message is missing from this processor
2856  */
2857  res = sq_item_inuse (sort_queue, instance->my_aru + i);
2858  if (res == 0) {
2859  /*
2860  * Determine how many times we have missed receiving
2861  * this sequence number. sq_item_miss_count increments
2862  * a counter for the sequence number. The miss count
2863  * will be returned and compared. This allows time for
2864  * delayed multicast messages to be received before
2865  * declaring the message is missing and requesting a
2866  * retransmit.
2867  */
2868  res = sq_item_miss_count (sort_queue, instance->my_aru + i);
2869  if (res < instance->totem_config->miss_count_const) {
2870  continue;
2871  }
2872 
2873  /*
2874  * Determine if missing message is already in retransmit list
2875  */
2876  found = 0;
2877  for (j = 0; j < orf_token->rtr_list_entries; j++) {
2878  if (instance->my_aru + i == rtr_list[j].seq) {
2879  found = 1;
2880  }
2881  }
2882  if (found == 0) {
2883  /*
2884  * Missing message not found in current retransmit list so add it
2885  */
2886  memcpy (&rtr_list[orf_token->rtr_list_entries].ring_id,
2887  &instance->my_ring_id, sizeof (struct memb_ring_id));
2888  rtr_list[orf_token->rtr_list_entries].seq = instance->my_aru + i;
2889  orf_token->rtr_list_entries++;
2890  }
2891  }
2892  }
2893  return (instance->fcc_remcast_current);
2894 }
2895 
2896 static void token_retransmit (struct totemsrp_instance *instance)
2897 {
2899  instance->orf_token_retransmit,
2900  instance->orf_token_retransmit_size);
2901 }
2902 
2903 /*
2904  * Retransmit the regular token if no mcast or token has
2905  * been received in retransmit token period retransmit
2906  * the token to the next processor
2907  */
2908 static void timer_function_token_retransmit_timeout (void *data)
2909 {
2910  struct totemsrp_instance *instance = data;
2911 
2912  switch (instance->memb_state) {
2913  case MEMB_STATE_GATHER:
2914  break;
2915  case MEMB_STATE_COMMIT:
2917  case MEMB_STATE_RECOVERY:
2918  token_retransmit (instance);
2919  reset_token_retransmit_timeout (instance); // REVIEWED
2920  break;
2921  }
2922 }
2923 
2924 static void timer_function_token_hold_retransmit_timeout (void *data)
2925 {
2926  struct totemsrp_instance *instance = data;
2927 
2928  switch (instance->memb_state) {
2929  case MEMB_STATE_GATHER:
2930  break;
2931  case MEMB_STATE_COMMIT:
2932  break;
2934  case MEMB_STATE_RECOVERY:
2935  token_retransmit (instance);
2936  break;
2937  }
2938 }
2939 
2940 static void timer_function_merge_detect_timeout(void *data)
2941 {
2942  struct totemsrp_instance *instance = data;
2943 
2945 
2946  switch (instance->memb_state) {
2948  if (instance->my_ring_id.rep == instance->my_id.nodeid) {
2949  memb_merge_detect_transmit (instance);
2950  }
2951  break;
2952  case MEMB_STATE_GATHER:
2953  case MEMB_STATE_COMMIT:
2954  case MEMB_STATE_RECOVERY:
2955  break;
2956  }
2957 }
2958 
2959 /*
2960  * Send orf_token to next member (requires orf_token)
2961  */
2962 static int token_send (
2963  struct totemsrp_instance *instance,
2964  struct orf_token *orf_token,
2965  int forward_token)
2966 {
2967  int res = 0;
2968  unsigned int orf_token_size;
2969 
2970  orf_token_size = sizeof (struct orf_token) +
2971  (orf_token->rtr_list_entries * sizeof (struct rtr_item));
2972 
2973  orf_token->header.nodeid = instance->my_id.nodeid;
2974  memcpy (instance->orf_token_retransmit, orf_token, orf_token_size);
2975  instance->orf_token_retransmit_size = orf_token_size;
2976  assert (orf_token->header.nodeid);
2977 
2978  if (forward_token == 0) {
2979  return (0);
2980  }
2981 
2983  orf_token,
2984  orf_token_size);
2985 
2986  return (res);
2987 }
2988 
2989 static int token_hold_cancel_send (struct totemsrp_instance *instance)
2990 {
2991  struct token_hold_cancel token_hold_cancel;
2992 
2993  /*
2994  * Only cancel if the token is currently held
2995  */
2996  if (instance->my_token_held == 0) {
2997  return (0);
2998  }
2999  instance->my_token_held = 0;
3000 
3001  /*
3002  * Build message
3003  */
3004  token_hold_cancel.header.magic = TOTEM_MH_MAGIC;
3005  token_hold_cancel.header.version = TOTEM_MH_VERSION;
3006  token_hold_cancel.header.type = MESSAGE_TYPE_TOKEN_HOLD_CANCEL;
3007  token_hold_cancel.header.encapsulated = 0;
3008  token_hold_cancel.header.nodeid = instance->my_id.nodeid;
3009  memcpy (&token_hold_cancel.ring_id, &instance->my_ring_id,
3010  sizeof (struct memb_ring_id));
3011  assert (token_hold_cancel.header.nodeid);
3012 
3013  instance->stats.token_hold_cancel_tx++;
3014 
3015  totemnet_mcast_flush_send (instance->totemnet_context, &token_hold_cancel,
3016  sizeof (struct token_hold_cancel));
3017 
3018  return (0);
3019 }
3020 
3021 static int orf_token_send_initial (struct totemsrp_instance *instance)
3022 {
3023  struct orf_token orf_token;
3024  int res;
3025 
3026  orf_token.header.magic = TOTEM_MH_MAGIC;
3027  orf_token.header.version = TOTEM_MH_VERSION;
3028  orf_token.header.type = MESSAGE_TYPE_ORF_TOKEN;
3029  orf_token.header.encapsulated = 0;
3030  orf_token.header.nodeid = instance->my_id.nodeid;
3031  assert (orf_token.header.nodeid);
3032  orf_token.seq = SEQNO_START_MSG;
3033  orf_token.token_seq = SEQNO_START_TOKEN;
3034  orf_token.retrans_flg = 1;
3035  instance->my_set_retrans_flg = 1;
3036  instance->stats.orf_token_tx++;
3037 
3038  if (cs_queue_is_empty (&instance->retrans_message_queue) == 1) {
3039  orf_token.retrans_flg = 0;
3040  instance->my_set_retrans_flg = 0;
3041  } else {
3042  orf_token.retrans_flg = 1;
3043  instance->my_set_retrans_flg = 1;
3044  }
3045 
3046  orf_token.aru = 0;
3047  orf_token.aru = SEQNO_START_MSG - 1;
3048  orf_token.aru_addr = instance->my_id.nodeid;
3049 
3050  memcpy (&orf_token.ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
3051  orf_token.fcc = 0;
3052  orf_token.backlog = 0;
3053 
3054  orf_token.rtr_list_entries = 0;
3055 
3056  res = token_send (instance, &orf_token, 1);
3057 
3058  return (res);
3059 }
3060 
3061 static void memb_state_commit_token_update (
3062  struct totemsrp_instance *instance)
3063 {
3064  struct srp_addr *addr;
3065  struct memb_commit_token_memb_entry *memb_list;
3066  unsigned int high_aru;
3067  unsigned int i;
3068 
3069  addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3070  memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries);
3071 
3072  memcpy (instance->my_new_memb_list, addr,
3073  sizeof (struct srp_addr) * instance->commit_token->addr_entries);
3074 
3075  instance->my_new_memb_entries = instance->commit_token->addr_entries;
3076 
3077  memcpy (&memb_list[instance->commit_token->memb_index].ring_id,
3078  &instance->my_old_ring_id, sizeof (struct memb_ring_id));
3079 
3080  memb_list[instance->commit_token->memb_index].aru = instance->old_ring_state_aru;
3081  /*
3082  * TODO high delivered is really instance->my_aru, but with safe this
3083  * could change?
3084  */
3085  instance->my_received_flg =
3086  (instance->my_aru == instance->my_high_seq_received);
3087 
3088  memb_list[instance->commit_token->memb_index].received_flg = instance->my_received_flg;
3089 
3090  memb_list[instance->commit_token->memb_index].high_delivered = instance->my_high_delivered;
3091  /*
3092  * find high aru up to current memb_index for all matching ring ids
3093  * if any ring id matching memb_index has aru less then high aru set
3094  * received flag for that entry to false
3095  */
3096  high_aru = memb_list[instance->commit_token->memb_index].aru;
3097  for (i = 0; i <= instance->commit_token->memb_index; i++) {
3098  if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
3099  &memb_list[i].ring_id,
3100  sizeof (struct memb_ring_id)) == 0) {
3101 
3102  if (sq_lt_compare (high_aru, memb_list[i].aru)) {
3103  high_aru = memb_list[i].aru;
3104  }
3105  }
3106  }
3107 
3108  for (i = 0; i <= instance->commit_token->memb_index; i++) {
3109  if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
3110  &memb_list[i].ring_id,
3111  sizeof (struct memb_ring_id)) == 0) {
3112 
3113  if (sq_lt_compare (memb_list[i].aru, high_aru)) {
3114  memb_list[i].received_flg = 0;
3115  if (i == instance->commit_token->memb_index) {
3116  instance->my_received_flg = 0;
3117  }
3118  }
3119  }
3120  }
3121 
3122  instance->commit_token->header.nodeid = instance->my_id.nodeid;
3123  instance->commit_token->memb_index += 1;
3124  assert (instance->commit_token->memb_index <= instance->commit_token->addr_entries);
3125  assert (instance->commit_token->header.nodeid);
3126 }
3127 
3128 static void memb_state_commit_token_target_set (
3129  struct totemsrp_instance *instance)
3130 {
3131  struct srp_addr *addr;
3132 
3133  addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3134 
3135  /* Totemnet just looks at the node id */
3137  instance->totemnet_context,
3138  addr[instance->commit_token->memb_index %
3139  instance->commit_token->addr_entries].nodeid);
3140 }
3141 
3142 static int memb_state_commit_token_send_recovery (
3143  struct totemsrp_instance *instance,
3144  struct memb_commit_token *commit_token)
3145 {
3146  unsigned int commit_token_size;
3147 
3148  commit_token->token_seq++;
3149  commit_token->header.nodeid = instance->my_id.nodeid;
3150  commit_token_size = sizeof (struct memb_commit_token) +
3151  ((sizeof (struct srp_addr) +
3152  sizeof (struct memb_commit_token_memb_entry)) * commit_token->addr_entries);
3153  /*
3154  * Make a copy for retransmission if necessary
3155  */
3156  memcpy (instance->orf_token_retransmit, commit_token, commit_token_size);
3157  instance->orf_token_retransmit_size = commit_token_size;
3158 
3159  instance->stats.memb_commit_token_tx++;
3160 
3162  commit_token,
3163  commit_token_size);
3164 
3165  /*
3166  * Request retransmission of the commit token in case it is lost
3167  */
3168  reset_token_retransmit_timeout (instance);
3169  return (0);
3170 }
3171 
3172 static int memb_state_commit_token_send (
3173  struct totemsrp_instance *instance)
3174 {
3175  unsigned int commit_token_size;
3176 
3177  instance->commit_token->token_seq++;
3178  instance->commit_token->header.nodeid = instance->my_id.nodeid;
3179  commit_token_size = sizeof (struct memb_commit_token) +
3180  ((sizeof (struct srp_addr) +
3181  sizeof (struct memb_commit_token_memb_entry)) * instance->commit_token->addr_entries);
3182  /*
3183  * Make a copy for retransmission if necessary
3184  */
3185  memcpy (instance->orf_token_retransmit, instance->commit_token, commit_token_size);
3186  instance->orf_token_retransmit_size = commit_token_size;
3187 
3188  instance->stats.memb_commit_token_tx++;
3189 
3191  instance->commit_token,
3192  commit_token_size);
3193 
3194  /*
3195  * Request retransmission of the commit token in case it is lost
3196  */
3197  reset_token_retransmit_timeout (instance);
3198  return (0);
3199 }
3200 
3201 
3202 static int memb_lowest_in_config (struct totemsrp_instance *instance)
3203 {
3204  struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3205  int token_memb_entries = 0;
3206  int i;
3207  unsigned int lowest_nodeid;
3208 
3209  memb_set_subtract (token_memb, &token_memb_entries,
3210  instance->my_proc_list, instance->my_proc_list_entries,
3211  instance->my_failed_list, instance->my_failed_list_entries);
3212 
3213  /*
3214  * find representative by searching for smallest identifier
3215  */
3216  assert(token_memb_entries > 0);
3217 
3218  lowest_nodeid = token_memb[0].nodeid;
3219  for (i = 1; i < token_memb_entries; i++) {
3220  if (lowest_nodeid > token_memb[i].nodeid) {
3221  lowest_nodeid = token_memb[i].nodeid;
3222  }
3223  }
3224  return (lowest_nodeid == instance->my_id.nodeid);
3225 }
3226 
3227 static int srp_addr_compare (const void *a, const void *b)
3228 {
3229  const struct srp_addr *srp_a = (const struct srp_addr *)a;
3230  const struct srp_addr *srp_b = (const struct srp_addr *)b;
3231 
3232  if (srp_a->nodeid < srp_b->nodeid) {
3233  return -1;
3234  } else if (srp_a->nodeid > srp_b->nodeid) {
3235  return 1;
3236  } else {
3237  return 0;
3238  }
3239 }
3240 
3241 static void memb_state_commit_token_create (
3242  struct totemsrp_instance *instance)
3243 {
3244  struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3245  struct srp_addr *addr;
3246  struct memb_commit_token_memb_entry *memb_list;
3247  int token_memb_entries = 0;
3248 
3250  "Creating commit token because I am the rep.");
3251 
3252  memb_set_subtract (token_memb, &token_memb_entries,
3253  instance->my_proc_list, instance->my_proc_list_entries,
3254  instance->my_failed_list, instance->my_failed_list_entries);
3255 
3256  memset (instance->commit_token, 0, sizeof (struct memb_commit_token));
3257  instance->commit_token->header.magic = TOTEM_MH_MAGIC;
3260  instance->commit_token->header.encapsulated = 0;
3261  instance->commit_token->header.nodeid = instance->my_id.nodeid;
3262  assert (instance->commit_token->header.nodeid);
3263 
3264  instance->commit_token->ring_id.rep = instance->my_id.nodeid;
3265  instance->commit_token->ring_id.seq = instance->token_ring_id_seq + 4;
3266 
3267  /*
3268  * This qsort is necessary to ensure the commit token traverses
3269  * the ring in the proper order
3270  */
3271  qsort (token_memb, token_memb_entries, sizeof (struct srp_addr),
3272  srp_addr_compare);
3273 
3274  instance->commit_token->memb_index = 0;
3275  instance->commit_token->addr_entries = token_memb_entries;
3276 
3277  addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3278  memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries);
3279 
3280  memcpy (addr, token_memb,
3281  token_memb_entries * sizeof (struct srp_addr));
3282  memset (memb_list, 0,
3283  sizeof (struct memb_commit_token_memb_entry) * token_memb_entries);
3284 }
3285 
3286 static void memb_join_message_send (struct totemsrp_instance *instance)
3287 {
3288  char memb_join_data[40000];
3289  struct memb_join *memb_join = (struct memb_join *)memb_join_data;
3290  char *addr;
3291  unsigned int addr_idx;
3292  size_t msg_len;
3293 
3294  memb_join->header.magic = TOTEM_MH_MAGIC;
3295  memb_join->header.version = TOTEM_MH_VERSION;
3296  memb_join->header.type = MESSAGE_TYPE_MEMB_JOIN;
3297  memb_join->header.encapsulated = 0;
3298  memb_join->header.nodeid = instance->my_id.nodeid;
3299  assert (memb_join->header.nodeid);
3300 
3301  msg_len = sizeof(struct memb_join) +
3302  ((instance->my_proc_list_entries + instance->my_failed_list_entries) * sizeof(struct srp_addr));
3303 
3304  if (msg_len > sizeof(memb_join_data)) {
3305  log_printf (instance->totemsrp_log_level_error,
3306  "memb_join_message too long. Ignoring message.");
3307 
3308  return ;
3309  }
3310 
3311  memb_join->ring_seq = instance->my_ring_id.seq;
3312  memb_join->proc_list_entries = instance->my_proc_list_entries;
3313  memb_join->failed_list_entries = instance->my_failed_list_entries;
3314  srp_addr_copy (&memb_join->system_from, &instance->my_id);
3315 
3316  /*
3317  * This mess adds the joined and failed processor lists into the join
3318  * message
3319  */
3320  addr = (char *)memb_join;
3321  addr_idx = sizeof (struct memb_join);
3322  memcpy (&addr[addr_idx],
3323  instance->my_proc_list,
3324  instance->my_proc_list_entries *
3325  sizeof (struct srp_addr));
3326  addr_idx +=
3327  instance->my_proc_list_entries *
3328  sizeof (struct srp_addr);
3329  memcpy (&addr[addr_idx],
3330  instance->my_failed_list,
3331  instance->my_failed_list_entries *
3332  sizeof (struct srp_addr));
3333  addr_idx +=
3334  instance->my_failed_list_entries *
3335  sizeof (struct srp_addr);
3336 
3337  if (instance->totem_config->send_join_timeout) {
3338  usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3339  }
3340 
3341  instance->stats.memb_join_tx++;
3342 
3344  instance->totemnet_context,
3345  memb_join,
3346  addr_idx);
3347 }
3348 
3349 static void memb_leave_message_send (struct totemsrp_instance *instance)
3350 {
3351  char memb_join_data[40000];
3352  struct memb_join *memb_join = (struct memb_join *)memb_join_data;
3353  char *addr;
3354  unsigned int addr_idx;
3355  int active_memb_entries;
3356  struct srp_addr active_memb[PROCESSOR_COUNT_MAX];
3357  size_t msg_len;
3358 
3360  "sending join/leave message");
3361 
3362  /*
3363  * add us to the failed list, and remove us from
3364  * the members list
3365  */
3366  memb_set_merge(
3367  &instance->my_id, 1,
3368  instance->my_failed_list, &instance->my_failed_list_entries);
3369 
3370  memb_set_subtract (active_memb, &active_memb_entries,
3371  instance->my_proc_list, instance->my_proc_list_entries,
3372  &instance->my_id, 1);
3373 
3374  msg_len = sizeof(struct memb_join) +
3375  ((active_memb_entries + instance->my_failed_list_entries) * sizeof(struct srp_addr));
3376 
3377  if (msg_len > sizeof(memb_join_data)) {
3378  log_printf (instance->totemsrp_log_level_error,
3379  "memb_leave message too long. Ignoring message.");
3380 
3381  return ;
3382  }
3383 
3384  memb_join->header.magic = TOTEM_MH_MAGIC;
3385  memb_join->header.version = TOTEM_MH_VERSION;
3386  memb_join->header.type = MESSAGE_TYPE_MEMB_JOIN;
3387  memb_join->header.encapsulated = 0;
3388  memb_join->header.nodeid = LEAVE_DUMMY_NODEID;
3389 
3390  memb_join->ring_seq = instance->my_ring_id.seq;
3391  memb_join->proc_list_entries = active_memb_entries;
3392  memb_join->failed_list_entries = instance->my_failed_list_entries;
3393  srp_addr_copy (&memb_join->system_from, &instance->my_id);
3394 
3395  // TODO: CC Maybe use the actual join send routine.
3396  /*
3397  * This mess adds the joined and failed processor lists into the join
3398  * message
3399  */
3400  addr = (char *)memb_join;
3401  addr_idx = sizeof (struct memb_join);
3402  memcpy (&addr[addr_idx],
3403  active_memb,
3404  active_memb_entries *
3405  sizeof (struct srp_addr));
3406  addr_idx +=
3407  active_memb_entries *
3408  sizeof (struct srp_addr);
3409  memcpy (&addr[addr_idx],
3410  instance->my_failed_list,
3411  instance->my_failed_list_entries *
3412  sizeof (struct srp_addr));
3413  addr_idx +=
3414  instance->my_failed_list_entries *
3415  sizeof (struct srp_addr);
3416 
3417 
3418  if (instance->totem_config->send_join_timeout) {
3419  usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3420  }
3421  instance->stats.memb_join_tx++;
3422 
3424  instance->totemnet_context,
3425  memb_join,
3426  addr_idx);
3427 }
3428 
3429 static void memb_merge_detect_transmit (struct totemsrp_instance *instance)
3430 {
3431  struct memb_merge_detect memb_merge_detect;
3432 
3433  memb_merge_detect.header.magic = TOTEM_MH_MAGIC;
3434  memb_merge_detect.header.version = TOTEM_MH_VERSION;
3435  memb_merge_detect.header.type = MESSAGE_TYPE_MEMB_MERGE_DETECT;
3436  memb_merge_detect.header.encapsulated = 0;
3437  memb_merge_detect.header.nodeid = instance->my_id.nodeid;
3438  srp_addr_copy (&memb_merge_detect.system_from, &instance->my_id);
3439  memcpy (&memb_merge_detect.ring_id, &instance->my_ring_id,
3440  sizeof (struct memb_ring_id));
3441  assert (memb_merge_detect.header.nodeid);
3442 
3443  instance->stats.memb_merge_detect_tx++;
3445  &memb_merge_detect,
3446  sizeof (struct memb_merge_detect));
3447 }
3448 
3449 static void memb_ring_id_set (
3450  struct totemsrp_instance *instance,
3451  const struct memb_ring_id *ring_id)
3452 {
3453 
3454  memcpy (&instance->my_ring_id, ring_id, sizeof (struct memb_ring_id));
3455 }
3456 
3458  void *srp_context,
3459  void **handle_out,
3460  enum totem_callback_token_type type,
3461  int delete,
3462  int (*callback_fn) (enum totem_callback_token_type type, const void *),
3463  const void *data)
3464 {
3465  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
3466  struct token_callback_instance *callback_handle;
3467 
3468  token_hold_cancel_send (instance);
3469 
3470  callback_handle = malloc (sizeof (struct token_callback_instance));
3471  if (callback_handle == 0) {
3472  return (-1);
3473  }
3474  *handle_out = (void *)callback_handle;
3475  qb_list_init (&callback_handle->list);
3476  callback_handle->callback_fn = callback_fn;
3477  callback_handle->data = (void *) data;
3478  callback_handle->callback_type = type;
3479  callback_handle->delete = delete;
3480  switch (type) {
3482  qb_list_add (&callback_handle->list, &instance->token_callback_received_listhead);
3483  break;
3485  qb_list_add (&callback_handle->list, &instance->token_callback_sent_listhead);
3486  break;
3487  }
3488 
3489  return (0);
3490 }
3491 
3492 void totemsrp_callback_token_destroy (void *srp_context, void **handle_out)
3493 {
3494  struct token_callback_instance *h;
3495 
3496  if (*handle_out) {
3497  h = (struct token_callback_instance *)*handle_out;
3498  qb_list_del (&h->list);
3499  free (h);
3500  h = NULL;
3501  *handle_out = 0;
3502  }
3503 }
3504 
3505 static void token_callbacks_execute (
3506  struct totemsrp_instance *instance,
3507  enum totem_callback_token_type type)
3508 {
3509  struct qb_list_head *list, *tmp_iter;
3510  struct qb_list_head *callback_listhead = 0;
3512  int res;
3513  int del;
3514 
3515  switch (type) {
3517  callback_listhead = &instance->token_callback_received_listhead;
3518  break;
3520  callback_listhead = &instance->token_callback_sent_listhead;
3521  break;
3522  default:
3523  assert (0);
3524  }
3525 
3526  qb_list_for_each_safe(list, tmp_iter, callback_listhead) {
3527  token_callback_instance = qb_list_entry (list, struct token_callback_instance, list);
3528  del = token_callback_instance->delete;
3529  if (del == 1) {
3530  qb_list_del (list);
3531  }
3532 
3533  res = token_callback_instance->callback_fn (
3534  token_callback_instance->callback_type,
3535  token_callback_instance->data);
3536  /*
3537  * This callback failed to execute, try it again on the next token
3538  */
3539  if (res == -1 && del == 1) {
3540  qb_list_add (list, callback_listhead);
3541  } else if (del) {
3542  free (token_callback_instance);
3543  }
3544  }
3545 }
3546 
3547 /*
3548  * Flow control functions
3549  */
3550 static unsigned int backlog_get (struct totemsrp_instance *instance)
3551 {
3552  unsigned int backlog = 0;
3553  struct cs_queue *queue_use = NULL;
3554 
3555  if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
3556  if (instance->waiting_trans_ack) {
3557  queue_use = &instance->new_message_queue_trans;
3558  } else {
3559  queue_use = &instance->new_message_queue;
3560  }
3561  } else
3562  if (instance->memb_state == MEMB_STATE_RECOVERY) {
3563  queue_use = &instance->retrans_message_queue;
3564  }
3565 
3566  if (queue_use != NULL) {
3567  backlog = cs_queue_used (queue_use);
3568  }
3569 
3570  instance->stats.token[instance->stats.latest_token].backlog_calc = backlog;
3571  return (backlog);
3572 }
3573 
3574 static int fcc_calculate (
3575  struct totemsrp_instance *instance,
3576  struct orf_token *token)
3577 {
3578  unsigned int transmits_allowed;
3579  unsigned int backlog_calc;
3580 
3581  transmits_allowed = instance->totem_config->max_messages;
3582 
3583  if (transmits_allowed > instance->totem_config->window_size - token->fcc) {
3584  transmits_allowed = instance->totem_config->window_size - token->fcc;
3585  }
3586 
3587  instance->my_cbl = backlog_get (instance);
3588 
3589  /*
3590  * Only do backlog calculation if there is a backlog otherwise
3591  * we would result in div by zero
3592  */
3593  if (token->backlog + instance->my_cbl - instance->my_pbl) {
3594  backlog_calc = (instance->totem_config->window_size * instance->my_pbl) /
3595  (token->backlog + instance->my_cbl - instance->my_pbl);
3596  if (backlog_calc > 0 && transmits_allowed > backlog_calc) {
3597  transmits_allowed = backlog_calc;
3598  }
3599  }
3600 
3601  return (transmits_allowed);
3602 }
3603 
3604 /*
3605  * don't overflow the RTR sort queue
3606  */
3607 static void fcc_rtr_limit (
3608  struct totemsrp_instance *instance,
3609  struct orf_token *token,
3610  unsigned int *transmits_allowed)
3611 {
3612  int check = QUEUE_RTR_ITEMS_SIZE_MAX;
3613  check -= (*transmits_allowed + instance->totem_config->window_size);
3614  assert (check >= 0);
3615  if (sq_lt_compare (instance->last_released +
3616  QUEUE_RTR_ITEMS_SIZE_MAX - *transmits_allowed -
3617  instance->totem_config->window_size,
3618 
3619  token->seq)) {
3620 
3621  *transmits_allowed = 0;
3622  }
3623 }
3624 
3625 static void fcc_token_update (
3626  struct totemsrp_instance *instance,
3627  struct orf_token *token,
3628  unsigned int msgs_transmitted)
3629 {
3630  token->fcc += msgs_transmitted - instance->my_trc;
3631  token->backlog += instance->my_cbl - instance->my_pbl;
3632  instance->my_trc = msgs_transmitted;
3633  instance->my_pbl = instance->my_cbl;
3634 }
3635 
3636 /*
3637  * Sanity checkers
3638  */
3639 static int check_orf_token_sanity(
3640  const struct totemsrp_instance *instance,
3641  const void *msg,
3642  size_t msg_len,
3643  size_t max_msg_len,
3644  int endian_conversion_needed)
3645 {
3646  int rtr_entries;
3647  const struct orf_token *token = (const struct orf_token *)msg;
3648  size_t required_len;
3649 
3650  if (msg_len > max_msg_len) {
3652  "Received orf_token message is too long... ignoring.");
3653 
3654  return (-1);
3655  }
3656 
3657  if (msg_len < sizeof(struct orf_token)) {
3659  "Received orf_token message is too short... ignoring.");
3660 
3661  return (-1);
3662  }
3663 
3664  if (endian_conversion_needed) {
3665  rtr_entries = swab32(token->rtr_list_entries);
3666  } else {
3667  rtr_entries = token->rtr_list_entries;
3668  }
3669 
3670  if (rtr_entries > RETRANSMIT_ENTRIES_MAX) {
3672  "Received orf_token message rtr_entries is corrupted... ignoring.");
3673 
3674  return (-1);
3675  }
3676 
3677  required_len = sizeof(struct orf_token) + rtr_entries * sizeof(struct rtr_item);
3678  if (msg_len < required_len) {
3680  "Received orf_token message is too short... ignoring.");
3681 
3682  return (-1);
3683  }
3684 
3685  return (0);
3686 }
3687 
3688 static int check_mcast_sanity(
3689  struct totemsrp_instance *instance,
3690  const void *msg,
3691  size_t msg_len,
3692  int endian_conversion_needed)
3693 {
3694 
3695  if (msg_len < sizeof(struct mcast)) {
3697  "Received mcast message is too short... ignoring.");
3698 
3699  return (-1);
3700  }
3701 
3702  return (0);
3703 }
3704 
3705 static int check_memb_merge_detect_sanity(
3706  struct totemsrp_instance *instance,
3707  const void *msg,
3708  size_t msg_len,
3709  int endian_conversion_needed)
3710 {
3711 
3712  if (msg_len < sizeof(struct memb_merge_detect)) {
3714  "Received memb_merge_detect message is too short... ignoring.");
3715 
3716  return (-1);
3717  }
3718 
3719  return (0);
3720 }
3721 
3722 static int check_memb_join_sanity(
3723  struct totemsrp_instance *instance,
3724  const void *msg,
3725  size_t msg_len,
3726  int endian_conversion_needed)
3727 {
3728  const struct memb_join *mj_msg = (const struct memb_join *)msg;
3729  unsigned int proc_list_entries;
3730  unsigned int failed_list_entries;
3731  size_t required_len;
3732 
3733  if (msg_len < sizeof(struct memb_join)) {
3735  "Received memb_join message is too short... ignoring.");
3736 
3737  return (-1);
3738  }
3739 
3740  proc_list_entries = mj_msg->proc_list_entries;
3741  failed_list_entries = mj_msg->failed_list_entries;
3742 
3743  if (endian_conversion_needed) {
3744  proc_list_entries = swab32(proc_list_entries);
3745  failed_list_entries = swab32(failed_list_entries);
3746  }
3747 
3748  required_len = sizeof(struct memb_join) + ((proc_list_entries + failed_list_entries) * sizeof(struct srp_addr));
3749  if (msg_len < required_len) {
3751  "Received memb_join message is too short... ignoring.");
3752 
3753  return (-1);
3754  }
3755 
3756  return (0);
3757 }
3758 
3759 static int check_memb_commit_token_sanity(
3760  struct totemsrp_instance *instance,
3761  const void *msg,
3762  size_t msg_len,
3763  int endian_conversion_needed)
3764 {
3765  const struct memb_commit_token *mct_msg = (const struct memb_commit_token *)msg;
3766  unsigned int addr_entries;
3767  size_t required_len;
3768 
3769  if (msg_len < sizeof(struct memb_commit_token)) {
3771  "Received memb_commit_token message is too short... ignoring.");
3772 
3773  return (0);
3774  }
3775 
3776  addr_entries= mct_msg->addr_entries;
3777  if (endian_conversion_needed) {
3778  addr_entries = swab32(addr_entries);
3779  }
3780 
3781  required_len = sizeof(struct memb_commit_token) +
3782  (addr_entries * (sizeof(struct srp_addr) + sizeof(struct memb_commit_token_memb_entry)));
3783  if (msg_len < required_len) {
3785  "Received memb_commit_token message is too short... ignoring.");
3786 
3787  return (-1);
3788  }
3789 
3790  return (0);
3791 }
3792 
3793 static int check_token_hold_cancel_sanity(
3794  struct totemsrp_instance *instance,
3795  const void *msg,
3796  size_t msg_len,
3797  int endian_conversion_needed)
3798 {
3799 
3800  if (msg_len < sizeof(struct token_hold_cancel)) {
3802  "Received token_hold_cancel message is too short... ignoring.");
3803 
3804  return (-1);
3805  }
3806 
3807  return (0);
3808 }
3809 
3810 /*
3811  * Message Handlers
3812  */
3813 
3814 unsigned long long int tv_old;
3815 /*
3816  * message handler called when TOKEN message type received
3817  */
3818 static int message_handler_orf_token (
3819  struct totemsrp_instance *instance,
3820  const void *msg,
3821  size_t msg_len,
3822  int endian_conversion_needed)
3823 {
3824  char token_storage[1500];
3825  char token_convert[1500];
3826  struct orf_token *token = NULL;
3827  int forward_token;
3828  unsigned int transmits_allowed;
3829  unsigned int mcasted_retransmit;
3830  unsigned int mcasted_regular;
3831  unsigned int last_aru;
3832 
3833 #ifdef GIVEINFO
3834  unsigned long long tv_current;
3835  unsigned long long tv_diff;
3836 
3837  tv_current = qb_util_nano_current_get ();
3838  tv_diff = tv_current - tv_old;
3839  tv_old = tv_current;
3840 
3842  "Time since last token %0.4f ms", ((float)tv_diff) / 1000000.0);
3843 #endif
3844 
3845  if (check_orf_token_sanity(instance, msg, msg_len, sizeof(token_storage),
3846  endian_conversion_needed) == -1) {
3847  return (0);
3848  }
3849 
3850  if (instance->orf_token_discard) {
3851  return (0);
3852  }
3853 #ifdef TEST_DROP_ORF_TOKEN_PERCENTAGE
3854  if (random()%100 < TEST_DROP_ORF_TOKEN_PERCENTAGE) {
3855  return (0);
3856  }
3857 #endif
3858 
3859  if (endian_conversion_needed) {
3860  orf_token_endian_convert ((struct orf_token *)msg,
3861  (struct orf_token *)token_convert);
3862  msg = (struct orf_token *)token_convert;
3863  }
3864 
3865  /*
3866  * Make copy of token and retransmit list in case we have
3867  * to flush incoming messages from the kernel queue
3868  */
3869  token = (struct orf_token *)token_storage;
3870  memcpy (token, msg, sizeof (struct orf_token));
3871  memcpy (&token->rtr_list[0], (char *)msg + sizeof (struct orf_token),
3872  sizeof (struct rtr_item) * RETRANSMIT_ENTRIES_MAX);
3873 
3874 
3875  /*
3876  * Handle merge detection timeout
3877  */
3878  if (token->seq == instance->my_last_seq) {
3879  start_merge_detect_timeout (instance);
3880  instance->my_seq_unchanged += 1;
3881  } else {
3882  cancel_merge_detect_timeout (instance);
3883  cancel_token_hold_retransmit_timeout (instance);
3884  instance->my_seq_unchanged = 0;
3885  }
3886 
3887  instance->my_last_seq = token->seq;
3888 
3889 #ifdef TEST_RECOVERY_MSG_COUNT
3890  if (instance->memb_state == MEMB_STATE_OPERATIONAL && token->seq > TEST_RECOVERY_MSG_COUNT) {
3891  return (0);
3892  }
3893 #endif
3894  instance->flushing = 1;
3896  instance->flushing = 0;
3897 
3898  /*
3899  * Determine if we should hold (in reality drop) the token
3900  */
3901  instance->my_token_held = 0;
3902  if (instance->my_ring_id.rep == instance->my_id.nodeid &&
3903  instance->my_seq_unchanged > instance->totem_config->seqno_unchanged_const) {
3904  instance->my_token_held = 1;
3905  } else {
3906  if (instance->my_ring_id.rep != instance->my_id.nodeid &&
3907  instance->my_seq_unchanged >= instance->totem_config->seqno_unchanged_const) {
3908  instance->my_token_held = 1;
3909  }
3910  }
3911 
3912  /*
3913  * Hold onto token when there is no activity on ring and
3914  * this processor is the ring rep
3915  */
3916  forward_token = 1;
3917  if (instance->my_ring_id.rep == instance->my_id.nodeid) {
3918  if (instance->my_token_held) {
3919  forward_token = 0;
3920  }
3921  }
3922 
3923  token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_RECEIVED);
3924 
3925  switch (instance->memb_state) {
3926  case MEMB_STATE_COMMIT:
3927  /* Discard token */
3928  break;
3929 
3931  messages_free (instance, token->aru);
3932  /*
3933  * Do NOT add break, this case should also execute code in gather case.
3934  */
3935 
3936  case MEMB_STATE_GATHER:
3937  /*
3938  * DO NOT add break, we use different free mechanism in recovery state
3939  */
3940 
3941  case MEMB_STATE_RECOVERY:
3942  /*
3943  * Discard tokens from another configuration
3944  */
3945  if (memcmp (&token->ring_id, &instance->my_ring_id,
3946  sizeof (struct memb_ring_id)) != 0) {
3947 
3948  if ((forward_token)
3949  && instance->use_heartbeat) {
3950  reset_heartbeat_timeout(instance);
3951  }
3952  else {
3953  cancel_heartbeat_timeout(instance);
3954  }
3955 
3956  return (0); /* discard token */
3957  }
3958 
3959  /*
3960  * Discard retransmitted tokens
3961  */
3962  if (sq_lte_compare (token->token_seq, instance->my_token_seq)) {
3963  return (0); /* discard token */
3964  }
3965  last_aru = instance->my_last_aru;
3966  instance->my_last_aru = token->aru;
3967 
3968  transmits_allowed = fcc_calculate (instance, token);
3969  mcasted_retransmit = orf_token_rtr (instance, token, &transmits_allowed);
3970 
3971  if (instance->my_token_held == 1 &&
3972  (token->rtr_list_entries > 0 || mcasted_retransmit > 0)) {
3973  instance->my_token_held = 0;
3974  forward_token = 1;
3975  }
3976 
3977  fcc_rtr_limit (instance, token, &transmits_allowed);
3978  mcasted_regular = orf_token_mcast (instance, token, transmits_allowed);
3979 /*
3980 if (mcasted_regular) {
3981 printf ("mcasted regular %d\n", mcasted_regular);
3982 printf ("token seq %d\n", token->seq);
3983 }
3984 */
3985  fcc_token_update (instance, token, mcasted_retransmit +
3986  mcasted_regular);
3987 
3988  if (sq_lt_compare (instance->my_aru, token->aru) ||
3989  instance->my_id.nodeid == token->aru_addr ||
3990  token->aru_addr == 0) {
3991 
3992  token->aru = instance->my_aru;
3993  if (token->aru == token->seq) {
3994  token->aru_addr = 0;
3995  } else {
3996  token->aru_addr = instance->my_id.nodeid;
3997  }
3998  }
3999  if (token->aru == last_aru && token->aru_addr != 0) {
4000  instance->my_aru_count += 1;
4001  } else {
4002  instance->my_aru_count = 0;
4003  }
4004 
4005  /*
4006  * We really don't follow specification there. In specification, OTHER nodes
4007  * detect failure of one node (based on aru_count) and my_id IS NEVER added
4008  * to failed list (so node never mark itself as failed)
4009  */
4010  if (instance->my_aru_count > instance->totem_config->fail_to_recv_const &&
4011  token->aru_addr == instance->my_id.nodeid) {
4012 
4014  "FAILED TO RECEIVE");
4015 
4016  instance->failed_to_recv = 1;
4017 
4018  memb_set_merge (&instance->my_id, 1,
4019  instance->my_failed_list,
4020  &instance->my_failed_list_entries);
4021 
4022  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FAILED_TO_RECEIVE);
4023  } else {
4024  instance->my_token_seq = token->token_seq;
4025  token->token_seq += 1;
4026 
4027  if (instance->memb_state == MEMB_STATE_RECOVERY) {
4028  /*
4029  * instance->my_aru == instance->my_high_seq_received means this processor
4030  * has recovered all messages it can recover
4031  * (ie: its retrans queue is empty)
4032  */
4033  if (cs_queue_is_empty (&instance->retrans_message_queue) == 0) {
4034 
4035  if (token->retrans_flg == 0) {
4036  token->retrans_flg = 1;
4037  instance->my_set_retrans_flg = 1;
4038  }
4039  } else
4040  if (token->retrans_flg == 1 && instance->my_set_retrans_flg) {
4041  token->retrans_flg = 0;
4042  instance->my_set_retrans_flg = 0;
4043  }
4045  "token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x",
4046  token->retrans_flg, instance->my_set_retrans_flg,
4047  cs_queue_is_empty (&instance->retrans_message_queue),
4048  instance->my_retrans_flg_count, token->aru);
4049  if (token->retrans_flg == 0) {
4050  instance->my_retrans_flg_count += 1;
4051  } else {
4052  instance->my_retrans_flg_count = 0;
4053  }
4054  if (instance->my_retrans_flg_count == 2) {
4055  instance->my_install_seq = token->seq;
4056  }
4058  "install seq %x aru %x high seq received %x",
4059  instance->my_install_seq, instance->my_aru, instance->my_high_seq_received);
4060  if (instance->my_retrans_flg_count >= 2 &&
4061  instance->my_received_flg == 0 &&
4062  sq_lte_compare (instance->my_install_seq, instance->my_aru)) {
4063  instance->my_received_flg = 1;
4064  instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
4065  memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
4066  sizeof (struct totem_ip_address) * instance->my_trans_memb_entries);
4067  }
4068  if (instance->my_retrans_flg_count >= 3 &&
4069  sq_lte_compare (instance->my_install_seq, token->aru)) {
4070  instance->my_rotation_counter += 1;
4071  } else {
4072  instance->my_rotation_counter = 0;
4073  }
4074  if (instance->my_rotation_counter == 2) {
4076  "retrans flag count %x token aru %x install seq %x aru %x %x",
4077  instance->my_retrans_flg_count, token->aru, instance->my_install_seq,
4078  instance->my_aru, token->seq);
4079 
4080  memb_state_operational_enter (instance);
4081  instance->my_rotation_counter = 0;
4082  instance->my_retrans_flg_count = 0;
4083  }
4084  }
4085 
4087  token_send (instance, token, forward_token);
4088 
4089 #ifdef GIVEINFO
4090  tv_current = qb_util_nano_current_get ();
4091  tv_diff = tv_current - tv_old;
4092  tv_old = tv_current;
4094  "I held %0.4f ms",
4095  ((float)tv_diff) / 1000000.0);
4096 #endif
4097  if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
4098  messages_deliver_to_app (instance, 0,
4099  instance->my_high_seq_received);
4100  }
4101 
4102  /*
4103  * Deliver messages after token has been transmitted
4104  * to improve performance
4105  */
4106  reset_token_timeout (instance); // REVIEWED
4107  reset_token_retransmit_timeout (instance); // REVIEWED
4108  if (instance->my_id.nodeid == instance->my_ring_id.rep &&
4109  instance->my_token_held == 1) {
4110 
4111  start_token_hold_retransmit_timeout (instance);
4112  }
4113 
4114  token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_SENT);
4115  }
4116  break;
4117  }
4118 
4119  if ((forward_token)
4120  && instance->use_heartbeat) {
4121  reset_heartbeat_timeout(instance);
4122  }
4123  else {
4124  cancel_heartbeat_timeout(instance);
4125  }
4126 
4127  return (0);
4128 }
4129 
4130 static void messages_deliver_to_app (
4131  struct totemsrp_instance *instance,
4132  int skip,
4133  unsigned int end_point)
4134 {
4135  struct sort_queue_item *sort_queue_item_p;
4136  unsigned int i;
4137  int res;
4138  struct mcast *mcast_in;
4139  struct mcast mcast_header;
4140  unsigned int range = 0;
4141  int endian_conversion_required;
4142  unsigned int my_high_delivered_stored = 0;
4143 
4144 
4145  range = end_point - instance->my_high_delivered;
4146 
4147  if (range) {
4149  "Delivering %x to %x", instance->my_high_delivered,
4150  end_point);
4151  }
4152  assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
4153  my_high_delivered_stored = instance->my_high_delivered;
4154 
4155  /*
4156  * Deliver messages in order from rtr queue to pending delivery queue
4157  */
4158  for (i = 1; i <= range; i++) {
4159 
4160  void *ptr = 0;
4161 
4162  /*
4163  * If out of range of sort queue, stop assembly
4164  */
4165  res = sq_in_range (&instance->regular_sort_queue,
4166  my_high_delivered_stored + i);
4167  if (res == 0) {
4168  break;
4169  }
4170 
4171  res = sq_item_get (&instance->regular_sort_queue,
4172  my_high_delivered_stored + i, &ptr);
4173  /*
4174  * If hole, stop assembly
4175  */
4176  if (res != 0 && skip == 0) {
4177  break;
4178  }
4179 
4180  instance->my_high_delivered = my_high_delivered_stored + i;
4181 
4182  if (res != 0) {
4183  continue;
4184 
4185  }
4186 
4187  sort_queue_item_p = ptr;
4188 
4189  mcast_in = sort_queue_item_p->mcast;
4190  assert (mcast_in != (struct mcast *)0xdeadbeef);
4191 
4192  endian_conversion_required = 0;
4193  if (mcast_in->header.magic != TOTEM_MH_MAGIC) {
4194  endian_conversion_required = 1;
4195  mcast_endian_convert (mcast_in, &mcast_header);
4196  } else {
4197  memcpy (&mcast_header, mcast_in, sizeof (struct mcast));
4198  }
4199 
4200  /*
4201  * Skip messages not originated in instance->my_deliver_memb
4202  */
4203  if (skip &&
4204  memb_set_subset (&mcast_header.system_from,
4205  1,
4206  instance->my_deliver_memb_list,
4207  instance->my_deliver_memb_entries) == 0) {
4208 
4209  instance->my_high_delivered = my_high_delivered_stored + i;
4210 
4211  continue;
4212  }
4213 
4214  /*
4215  * Message found
4216  */
4218  "Delivering MCAST message with seq %x to pending delivery queue",
4219  mcast_header.seq);
4220 
4221  /*
4222  * Message is locally originated multicast
4223  */
4224  instance->totemsrp_deliver_fn (
4225  mcast_header.header.nodeid,
4226  ((char *)sort_queue_item_p->mcast) + sizeof (struct mcast),
4227  sort_queue_item_p->msg_len - sizeof (struct mcast),
4228  endian_conversion_required);
4229  }
4230 }
4231 
4232 /*
4233  * recv message handler called when MCAST message type received
4234  */
4235 static int message_handler_mcast (
4236  struct totemsrp_instance *instance,
4237  const void *msg,
4238  size_t msg_len,
4239  int endian_conversion_needed)
4240 {
4241  struct sort_queue_item sort_queue_item;
4242  struct sq *sort_queue;
4243  struct mcast mcast_header;
4244 
4245  if (check_mcast_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4246  return (0);
4247  }
4248 
4249  if (endian_conversion_needed) {
4250  mcast_endian_convert (msg, &mcast_header);
4251  } else {
4252  memcpy (&mcast_header, msg, sizeof (struct mcast));
4253  }
4254 
4255  if (mcast_header.header.encapsulated == MESSAGE_ENCAPSULATED) {
4256  sort_queue = &instance->recovery_sort_queue;
4257  } else {
4258  sort_queue = &instance->regular_sort_queue;
4259  }
4260 
4261  assert (msg_len <= FRAME_SIZE_MAX);
4262 
4263 #ifdef TEST_DROP_MCAST_PERCENTAGE
4264  if (random()%100 < TEST_DROP_MCAST_PERCENTAGE) {
4265  return (0);
4266  }
4267 #endif
4268 
4269  /*
4270  * If the message is foreign execute the switch below
4271  */
4272  if (memcmp (&instance->my_ring_id, &mcast_header.ring_id,
4273  sizeof (struct memb_ring_id)) != 0) {
4274 
4275  switch (instance->memb_state) {
4277  memb_set_merge (
4278  &mcast_header.system_from, 1,
4279  instance->my_proc_list, &instance->my_proc_list_entries);
4280  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE);
4281  break;
4282 
4283  case MEMB_STATE_GATHER:
4284  if (!memb_set_subset (
4285  &mcast_header.system_from,
4286  1,
4287  instance->my_proc_list,
4288  instance->my_proc_list_entries)) {
4289 
4290  memb_set_merge (&mcast_header.system_from, 1,
4291  instance->my_proc_list, &instance->my_proc_list_entries);
4292  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE);
4293  return (0);
4294  }
4295  break;
4296 
4297  case MEMB_STATE_COMMIT:
4298  /* discard message */
4299  instance->stats.rx_msg_dropped++;
4300  break;
4301 
4302  case MEMB_STATE_RECOVERY:
4303  /* discard message */
4304  instance->stats.rx_msg_dropped++;
4305  break;
4306  }
4307  return (0);
4308  }
4309 
4311  "Received ringid(%u:%lld) seq %x",
4312  mcast_header.ring_id.rep,
4313  mcast_header.ring_id.seq,
4314  mcast_header.seq);
4315 
4316  /*
4317  * Add mcast message to rtr queue if not already in rtr queue
4318  * otherwise free io vectors
4319  */
4320  if (msg_len > 0 && msg_len <= FRAME_SIZE_MAX &&
4321  sq_in_range (sort_queue, mcast_header.seq) &&
4322  sq_item_inuse (sort_queue, mcast_header.seq) == 0) {
4323 
4324  /*
4325  * Allocate new multicast memory block
4326  */
4327 // TODO LEAK
4328  sort_queue_item.mcast = totemsrp_buffer_alloc (instance);
4329  if (sort_queue_item.mcast == NULL) {
4330  return (-1); /* error here is corrected by the algorithm */
4331  }
4332  memcpy (sort_queue_item.mcast, msg, msg_len);
4333  sort_queue_item.msg_len = msg_len;
4334 
4335  if (sq_lt_compare (instance->my_high_seq_received,
4336  mcast_header.seq)) {
4337  instance->my_high_seq_received = mcast_header.seq;
4338  }
4339 
4340  sq_item_add (sort_queue, &sort_queue_item, mcast_header.seq);
4341  }
4342 
4343  update_aru (instance);
4344  if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
4345  messages_deliver_to_app (instance, 0, instance->my_high_seq_received);
4346  }
4347 
4348 /* TODO remove from retrans message queue for old ring in recovery state */
4349  return (0);
4350 }
4351 
4352 static int message_handler_memb_merge_detect (
4353  struct totemsrp_instance *instance,
4354  const void *msg,
4355  size_t msg_len,
4356  int endian_conversion_needed)
4357 {
4358  struct memb_merge_detect memb_merge_detect;
4359 
4360  if (check_memb_merge_detect_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4361  return (0);
4362  }
4363 
4364  if (endian_conversion_needed) {
4365  memb_merge_detect_endian_convert (msg, &memb_merge_detect);
4366  } else {
4367  memcpy (&memb_merge_detect, msg,
4368  sizeof (struct memb_merge_detect));
4369  }
4370 
4371  /*
4372  * do nothing if this is a merge detect from this configuration
4373  */
4374  if (memcmp (&instance->my_ring_id, &memb_merge_detect.ring_id,
4375  sizeof (struct memb_ring_id)) == 0) {
4376 
4377  return (0);
4378  }
4379 
4380  /*
4381  * Execute merge operation
4382  */
4383  switch (instance->memb_state) {
4385  memb_set_merge (&memb_merge_detect.system_from, 1,
4386  instance->my_proc_list, &instance->my_proc_list_entries);
4387  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE);
4388  break;
4389 
4390  case MEMB_STATE_GATHER:
4391  if (!memb_set_subset (
4392  &memb_merge_detect.system_from,
4393  1,
4394  instance->my_proc_list,
4395  instance->my_proc_list_entries)) {
4396 
4397  memb_set_merge (&memb_merge_detect.system_from, 1,
4398  instance->my_proc_list, &instance->my_proc_list_entries);
4399  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE);
4400  return (0);
4401  }
4402  break;
4403 
4404  case MEMB_STATE_COMMIT:
4405  /* do nothing in commit */
4406  break;
4407 
4408  case MEMB_STATE_RECOVERY:
4409  /* do nothing in recovery */
4410  break;
4411  }
4412  return (0);
4413 }
4414 
4415 static void memb_join_process (
4416  struct totemsrp_instance *instance,
4417  const struct memb_join *memb_join)
4418 {
4419  struct srp_addr *proc_list;
4420  struct srp_addr *failed_list;
4421  int gather_entered = 0;
4422  int fail_minus_memb_entries = 0;
4423  struct srp_addr fail_minus_memb[PROCESSOR_COUNT_MAX];
4424 
4425  proc_list = (struct srp_addr *)memb_join->end_of_memb_join;
4426  failed_list = proc_list + memb_join->proc_list_entries;
4427 
4428  log_printf(instance->totemsrp_log_level_trace, "memb_join_process");
4429  memb_set_log(instance, instance->totemsrp_log_level_trace,
4430  "proclist", proc_list, memb_join->proc_list_entries);
4431  memb_set_log(instance, instance->totemsrp_log_level_trace,
4432  "faillist", failed_list, memb_join->failed_list_entries);
4433  memb_set_log(instance, instance->totemsrp_log_level_trace,
4434  "my_proclist", instance->my_proc_list, instance->my_proc_list_entries);
4435  memb_set_log(instance, instance->totemsrp_log_level_trace,
4436  "my_faillist", instance->my_failed_list, instance->my_failed_list_entries);
4437 
4438  if (memb_join->header.type == MESSAGE_TYPE_MEMB_JOIN) {
4439  if (instance->flushing) {
4440  if (memb_join->header.nodeid == LEAVE_DUMMY_NODEID) {
4442  "Discarding LEAVE message during flush, nodeid=%u",
4443  memb_join->failed_list_entries > 0 ? failed_list[memb_join->failed_list_entries - 1 ].nodeid : LEAVE_DUMMY_NODEID);
4444  if (memb_join->failed_list_entries > 0) {
4445  my_leave_memb_set(instance, failed_list[memb_join->failed_list_entries - 1 ].nodeid);
4446  }
4447  } else {
4449  "Discarding JOIN message during flush, nodeid=%d", memb_join->header.nodeid);
4450  }
4451  return;
4452  } else {
4453  if (memb_join->header.nodeid == LEAVE_DUMMY_NODEID) {
4455  "Received LEAVE message from %u", memb_join->failed_list_entries > 0 ? failed_list[memb_join->failed_list_entries - 1 ].nodeid : LEAVE_DUMMY_NODEID);
4456  if (memb_join->failed_list_entries > 0) {
4457  my_leave_memb_set(instance, failed_list[memb_join->failed_list_entries - 1 ].nodeid);
4458  }
4459  }
4460  }
4461 
4462  }
4463 
4464  if (memb_set_equal (proc_list,
4465  memb_join->proc_list_entries,
4466  instance->my_proc_list,
4467  instance->my_proc_list_entries) &&
4468 
4469  memb_set_equal (failed_list,
4470  memb_join->failed_list_entries,
4471  instance->my_failed_list,
4472  instance->my_failed_list_entries)) {
4473 
4474  if (memb_join->header.nodeid != LEAVE_DUMMY_NODEID) {
4475  memb_consensus_set (instance, &memb_join->system_from);
4476  }
4477 
4478  if (memb_consensus_agreed (instance) && instance->failed_to_recv == 1) {
4479  instance->failed_to_recv = 0;
4480  srp_addr_copy (&instance->my_proc_list[0],
4481  &instance->my_id);
4482  instance->my_proc_list_entries = 1;
4483  instance->my_failed_list_entries = 0;
4484 
4485  memb_state_commit_token_create (instance);
4486 
4487  memb_state_commit_enter (instance);
4488  return;
4489  }
4490  if (memb_consensus_agreed (instance) &&
4491  memb_lowest_in_config (instance)) {
4492 
4493  memb_state_commit_token_create (instance);
4494 
4495  memb_state_commit_enter (instance);
4496  } else {
4497  goto out;
4498  }
4499  } else
4500  if (memb_set_subset (proc_list,
4501  memb_join->proc_list_entries,
4502  instance->my_proc_list,
4503  instance->my_proc_list_entries) &&
4504 
4505  memb_set_subset (failed_list,
4506  memb_join->failed_list_entries,
4507  instance->my_failed_list,
4508  instance->my_failed_list_entries)) {
4509 
4510  goto out;
4511  } else
4512  if (memb_set_subset (&memb_join->system_from, 1,
4513  instance->my_failed_list, instance->my_failed_list_entries)) {
4514 
4515  goto out;
4516  } else {
4517  memb_set_merge (proc_list,
4518  memb_join->proc_list_entries,
4519  instance->my_proc_list, &instance->my_proc_list_entries);
4520 
4521  if (memb_set_subset (
4522  &instance->my_id, 1,
4523  failed_list, memb_join->failed_list_entries)) {
4524 
4525  memb_set_merge (
4526  &memb_join->system_from, 1,
4527  instance->my_failed_list, &instance->my_failed_list_entries);
4528  } else {
4529  if (memb_set_subset (
4530  &memb_join->system_from, 1,
4531  instance->my_memb_list,
4532  instance->my_memb_entries)) {
4533 
4534  if (memb_set_subset (
4535  &memb_join->system_from, 1,
4536  instance->my_failed_list,
4537  instance->my_failed_list_entries) == 0) {
4538 
4539  memb_set_merge (failed_list,
4540  memb_join->failed_list_entries,
4541  instance->my_failed_list, &instance->my_failed_list_entries);
4542  } else {
4543  memb_set_subtract (fail_minus_memb,
4544  &fail_minus_memb_entries,
4545  failed_list,
4546  memb_join->failed_list_entries,
4547  instance->my_memb_list,
4548  instance->my_memb_entries);
4549 
4550  memb_set_merge (fail_minus_memb,
4551  fail_minus_memb_entries,
4552  instance->my_failed_list,
4553  &instance->my_failed_list_entries);
4554  }
4555  }
4556  }
4557  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_JOIN);
4558  gather_entered = 1;
4559  }
4560 
4561 out:
4562  if (gather_entered == 0 &&
4563  instance->memb_state == MEMB_STATE_OPERATIONAL) {
4564 
4565  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE);
4566  }
4567 }
4568 
4569 static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out)
4570 {
4571  int i;
4572  struct srp_addr *in_proc_list;
4573  struct srp_addr *in_failed_list;
4574  struct srp_addr *out_proc_list;
4575  struct srp_addr *out_failed_list;
4576 
4577  out->header.magic = TOTEM_MH_MAGIC;
4579  out->header.type = in->header.type;
4580  out->header.nodeid = swab32 (in->header.nodeid);
4581  srp_addr_copy_endian_convert (&out->system_from, &in->system_from);
4584  out->ring_seq = swab64 (in->ring_seq);
4585 
4586  in_proc_list = (struct srp_addr *)in->end_of_memb_join;
4587  in_failed_list = in_proc_list + out->proc_list_entries;
4588  out_proc_list = (struct srp_addr *)out->end_of_memb_join;
4589  out_failed_list = out_proc_list + out->proc_list_entries;
4590 
4591  for (i = 0; i < out->proc_list_entries; i++) {
4592  srp_addr_copy_endian_convert (&out_proc_list[i], &in_proc_list[i]);
4593  }
4594  for (i = 0; i < out->failed_list_entries; i++) {
4595  srp_addr_copy_endian_convert (&out_failed_list[i], &in_failed_list[i]);
4596  }
4597 }
4598 
4599 static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out)
4600 {
4601  int i;
4602  struct srp_addr *in_addr = (struct srp_addr *)in->end_of_commit_token;
4603  struct srp_addr *out_addr = (struct srp_addr *)out->end_of_commit_token;
4604  struct memb_commit_token_memb_entry *in_memb_list;
4605  struct memb_commit_token_memb_entry *out_memb_list;
4606 
4607  out->header.magic = TOTEM_MH_MAGIC;
4609  out->header.type = in->header.type;
4610  out->header.nodeid = swab32 (in->header.nodeid);
4611  out->token_seq = swab32 (in->token_seq);
4612  out->ring_id.rep = swab32(in->ring_id.rep);
4613  out->ring_id.seq = swab64 (in->ring_id.seq);
4614  out->retrans_flg = swab32 (in->retrans_flg);
4615  out->memb_index = swab32 (in->memb_index);
4616  out->addr_entries = swab32 (in->addr_entries);
4617 
4618  in_memb_list = (struct memb_commit_token_memb_entry *)(in_addr + out->addr_entries);
4619  out_memb_list = (struct memb_commit_token_memb_entry *)(out_addr + out->addr_entries);
4620  for (i = 0; i < out->addr_entries; i++) {
4621  srp_addr_copy_endian_convert (&out_addr[i], &in_addr[i]);
4622 
4623  /*
4624  * Only convert the memb entry if it has been set
4625  */
4626  if (in_memb_list[i].ring_id.rep != 0) {
4627  out_memb_list[i].ring_id.rep = swab32(in_memb_list[i].ring_id.rep);
4628 
4629  out_memb_list[i].ring_id.seq =
4630  swab64 (in_memb_list[i].ring_id.seq);
4631  out_memb_list[i].aru = swab32 (in_memb_list[i].aru);
4632  out_memb_list[i].high_delivered = swab32 (in_memb_list[i].high_delivered);
4633  out_memb_list[i].received_flg = swab32 (in_memb_list[i].received_flg);
4634  }
4635  }
4636 }
4637 
4638 static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out)
4639 {
4640  int i;
4641 
4642  out->header.magic = TOTEM_MH_MAGIC;
4644  out->header.type = in->header.type;
4645  out->header.nodeid = swab32 (in->header.nodeid);
4646  out->seq = swab32 (in->seq);
4647  out->token_seq = swab32 (in->token_seq);
4648  out->aru = swab32 (in->aru);
4649  out->ring_id.rep = swab32(in->ring_id.rep);
4650  out->aru_addr = swab32(in->aru_addr);
4651  out->ring_id.seq = swab64 (in->ring_id.seq);
4652  out->fcc = swab32 (in->fcc);
4653  out->backlog = swab32 (in->backlog);
4654  out->retrans_flg = swab32 (in->retrans_flg);
4656  for (i = 0; i < out->rtr_list_entries; i++) {
4657  out->rtr_list[i].ring_id.rep = swab32(in->rtr_list[i].ring_id.rep);
4658  out->rtr_list[i].ring_id.seq = swab64 (in->rtr_list[i].ring_id.seq);
4659  out->rtr_list[i].seq = swab32 (in->rtr_list[i].seq);
4660  }
4661 }
4662 
4663 static void mcast_endian_convert (const struct mcast *in, struct mcast *out)
4664 {
4665  out->header.magic = TOTEM_MH_MAGIC;
4667  out->header.type = in->header.type;
4668  out->header.nodeid = swab32 (in->header.nodeid);
4670 
4671  out->seq = swab32 (in->seq);
4672  out->this_seqno = swab32 (in->this_seqno);
4673  out->ring_id.rep = swab32(in->ring_id.rep);
4674  out->ring_id.seq = swab64 (in->ring_id.seq);
4675  out->node_id = swab32 (in->node_id);
4676  out->guarantee = swab32 (in->guarantee);
4677  srp_addr_copy_endian_convert (&out->system_from, &in->system_from);
4678 }
4679 
4680 static void memb_merge_detect_endian_convert (
4681  const struct memb_merge_detect *in,
4682  struct memb_merge_detect *out)
4683 {
4684  out->header.magic = TOTEM_MH_MAGIC;
4686  out->header.type = in->header.type;
4687  out->header.nodeid = swab32 (in->header.nodeid);
4688  out->ring_id.rep = swab32(in->ring_id.rep);
4689  out->ring_id.seq = swab64 (in->ring_id.seq);
4690  srp_addr_copy_endian_convert (&out->system_from, &in->system_from);
4691 }
4692 
4693 static int ignore_join_under_operational (
4694  struct totemsrp_instance *instance,
4695  const struct memb_join *memb_join)
4696 {
4697  struct srp_addr *proc_list;
4698  struct srp_addr *failed_list;
4699  unsigned long long ring_seq;
4700 
4701  proc_list = (struct srp_addr *)memb_join->end_of_memb_join;
4702  failed_list = proc_list + memb_join->proc_list_entries;
4703  ring_seq = memb_join->ring_seq;
4704 
4705  if (memb_set_subset (&instance->my_id, 1,
4706  failed_list, memb_join->failed_list_entries)) {
4707  return (1);
4708  }
4709 
4710  /*
4711  * In operational state, my_proc_list is exactly the same as
4712  * my_memb_list.
4713  */
4714  if ((memb_set_subset (&memb_join->system_from, 1,
4715  instance->my_memb_list, instance->my_memb_entries)) &&
4716  (ring_seq < instance->my_ring_id.seq)) {
4717  return (1);
4718  }
4719 
4720  return (0);
4721 }
4722 
4723 static int message_handler_memb_join (
4724  struct totemsrp_instance *instance,
4725  const void *msg,
4726  size_t msg_len,
4727  int endian_conversion_needed)
4728 {
4729  const struct memb_join *memb_join;
4730  struct memb_join *memb_join_convert = alloca (msg_len);
4731 
4732  if (check_memb_join_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4733  return (0);
4734  }
4735 
4736  if (endian_conversion_needed) {
4737  memb_join = memb_join_convert;
4738  memb_join_endian_convert (msg, memb_join_convert);
4739 
4740  } else {
4741  memb_join = msg;
4742  }
4743  /*
4744  * If the process paused because it wasn't scheduled in a timely
4745  * fashion, flush the join messages because they may be queued
4746  * entries
4747  */
4748  if (pause_flush (instance)) {
4749  return (0);
4750  }
4751 
4752  if (instance->token_ring_id_seq < memb_join->ring_seq) {
4753  instance->token_ring_id_seq = memb_join->ring_seq;
4754  }
4755  switch (instance->memb_state) {
4757  if (!ignore_join_under_operational (instance, memb_join)) {
4758  memb_join_process (instance, memb_join);
4759  }
4760  break;
4761 
4762  case MEMB_STATE_GATHER:
4763  memb_join_process (instance, memb_join);
4764  break;
4765 
4766  case MEMB_STATE_COMMIT:
4767  if (memb_set_subset (&memb_join->system_from,
4768  1,
4769  instance->my_new_memb_list,
4770  instance->my_new_memb_entries) &&
4771 
4772  memb_join->ring_seq >= instance->my_ring_id.seq) {
4773 
4774  memb_join_process (instance, memb_join);
4775  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE);
4776  }
4777  break;
4778 
4779  case MEMB_STATE_RECOVERY:
4780  if (memb_set_subset (&memb_join->system_from,
4781  1,
4782  instance->my_new_memb_list,
4783  instance->my_new_memb_entries) &&
4784 
4785  memb_join->ring_seq >= instance->my_ring_id.seq) {
4786 
4787  memb_join_process (instance, memb_join);
4788  memb_recovery_state_token_loss (instance);
4789  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY);
4790  }
4791  break;
4792  }
4793  return (0);
4794 }
4795 
4796 static int message_handler_memb_commit_token (
4797  struct totemsrp_instance *instance,
4798  const void *msg,
4799  size_t msg_len,
4800  int endian_conversion_needed)
4801 {
4802  struct memb_commit_token *memb_commit_token_convert = alloca (msg_len);
4803  struct memb_commit_token *memb_commit_token;
4804  struct srp_addr sub[PROCESSOR_COUNT_MAX];
4805  int sub_entries;
4806 
4807  struct srp_addr *addr;
4808 
4810  "got commit token");
4811 
4812  if (check_memb_commit_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4813  return (0);
4814  }
4815 
4816  if (endian_conversion_needed) {
4817  memb_commit_token_endian_convert (msg, memb_commit_token_convert);
4818  } else {
4819  memcpy (memb_commit_token_convert, msg, msg_len);
4820  }
4821  memb_commit_token = memb_commit_token_convert;
4822  addr = (struct srp_addr *)memb_commit_token->end_of_commit_token;
4823 
4824 #ifdef TEST_DROP_COMMIT_TOKEN_PERCENTAGE
4825  if (random()%100 < TEST_DROP_COMMIT_TOKEN_PERCENTAGE) {
4826  return (0);
4827  }
4828 #endif
4829  switch (instance->memb_state) {
4831  /* discard token */
4832  break;
4833 
4834  case MEMB_STATE_GATHER:
4835  memb_set_subtract (sub, &sub_entries,
4836  instance->my_proc_list, instance->my_proc_list_entries,
4837  instance->my_failed_list, instance->my_failed_list_entries);
4838 
4839  if (memb_set_equal (addr,
4840  memb_commit_token->addr_entries,
4841  sub,
4842  sub_entries) &&
4843 
4844  memb_commit_token->ring_id.seq > instance->my_ring_id.seq) {
4845  memcpy (instance->commit_token, memb_commit_token, msg_len);
4846  memb_state_commit_enter (instance);
4847  }
4848  break;
4849 
4850  case MEMB_STATE_COMMIT:
4851  /*
4852  * If retransmitted commit tokens are sent on this ring
4853  * filter them out and only enter recovery once the
4854  * commit token has traversed the array. This is
4855  * determined by :
4856  * memb_commit_token->memb_index == memb_commit_token->addr_entries) {
4857  */
4858  if (memb_commit_token->ring_id.seq == instance->my_ring_id.seq &&
4859  memb_commit_token->memb_index == memb_commit_token->addr_entries) {
4860  memb_state_recovery_enter (instance, memb_commit_token);
4861  }
4862  break;
4863 
4864  case MEMB_STATE_RECOVERY:
4865  if (instance->my_id.nodeid == instance->my_ring_id.rep) {
4866 
4867  /* Filter out duplicated tokens */
4868  if (instance->originated_orf_token) {
4869  break;
4870  }
4871 
4872  instance->originated_orf_token = 1;
4873 
4875  "Sending initial ORF token");
4876 
4877  // TODO convert instead of initiate
4878  orf_token_send_initial (instance);
4879  reset_token_timeout (instance); // REVIEWED
4880  reset_token_retransmit_timeout (instance); // REVIEWED
4881  }
4882  break;
4883  }
4884  return (0);
4885 }
4886 
4887 static int message_handler_token_hold_cancel (
4888  struct totemsrp_instance *instance,
4889  const void *msg,
4890  size_t msg_len,
4891  int endian_conversion_needed)
4892 {
4893  const struct token_hold_cancel *token_hold_cancel = msg;
4894 
4895  if (check_token_hold_cancel_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4896  return (0);
4897  }
4898 
4899  if (memcmp (&token_hold_cancel->ring_id, &instance->my_ring_id,
4900  sizeof (struct memb_ring_id)) == 0) {
4901 
4902  instance->my_seq_unchanged = 0;
4903  if (instance->my_ring_id.rep == instance->my_id.nodeid) {
4904  timer_function_token_retransmit_timeout (instance);
4905  }
4906  }
4907  return (0);
4908 }
4909 
4910 static int check_message_header_validity(
4911  void *context,
4912  const void *msg,
4913  unsigned int msg_len,
4914  const struct sockaddr_storage *system_from)
4915 {
4916  struct totemsrp_instance *instance = context;
4917  const struct totem_message_header *message_header = msg;
4918  const char *guessed_str;
4919  const char *msg_byte = msg;
4920 
4921  if (msg_len < sizeof (struct totem_message_header)) {
4923  "Message received from %s is too short... Ignoring %u.",
4924  totemip_sa_print((struct sockaddr *)system_from), (unsigned int)msg_len);
4925  return (-1);
4926  }
4927 
4928  if (message_header->magic != TOTEM_MH_MAGIC &&
4929  message_header->magic != swab16(TOTEM_MH_MAGIC)) {
4930  /*
4931  * We've received ether Knet, old version of Corosync,
4932  * or something else. Do some guessing to display (hopefully)
4933  * helpful message
4934  */
4935  guessed_str = NULL;
4936 
4937  if (message_header->magic == 0xFFFF) {
4938  /*
4939  * Corosync 2.2 used header with two UINT8_MAX
4940  */
4941  guessed_str = "Corosync 2.2";
4942  } else if (message_header->magic == 0xFEFE) {
4943  /*
4944  * Corosync 2.3+ used header with two UINT8_MAX - 1
4945  */
4946  guessed_str = "Corosync 2.3+";
4947  } else if (msg_byte[0] == 0x01) {
4948  /*
4949  * Knet has stable1 with first byte of message == 1
4950  */
4951  guessed_str = "unencrypted Kronosnet";
4952  } else if (msg_byte[0] >= 0 && msg_byte[0] <= 5) {
4953  /*
4954  * Unencrypted Corosync 1.x/OpenAIS has first byte
4955  * 0-5. Collision with Knet (but still worth the try)
4956  */
4957  guessed_str = "unencrypted Corosync 2.0/2.1/1.x/OpenAIS";
4958  } else {
4959  /*
4960  * Encrypted Kronosned packet has a hash at the end of
4961  * the packet and nothing specific at the beginning of the
4962  * packet (just encrypted data).
4963  * Encrypted Corosync 1.x/OpenAIS is quite similar but hash_digest
4964  * is in the beginning of the packet.
4965  *
4966  * So it's not possible to reliably detect ether of them.
4967  */
4968  guessed_str = "encrypted Kronosnet/Corosync 2.0/2.1/1.x/OpenAIS or unknown";
4969  }
4970 
4972  "Message received from %s has bad magic number (probably sent by %s).. Ignoring",
4973  totemip_sa_print((struct sockaddr *)system_from),
4974  guessed_str);
4975 
4976  return (-1);
4977  }
4978 
4979  if (message_header->version != TOTEM_MH_VERSION) {
4981  "Message received from %s has unsupported version %u... Ignoring",
4982  totemip_sa_print((struct sockaddr *)system_from),
4983  message_header->version);
4984 
4985  return (-1);
4986  }
4987 
4988  return (0);
4989 }
4990 
4991 
4993  void *context,
4994  const void *msg,
4995  unsigned int msg_len,
4996  const struct sockaddr_storage *system_from)
4997 {
4998  struct totemsrp_instance *instance = context;
4999  const struct totem_message_header *message_header = msg;
5000 
5001  if (check_message_header_validity(context, msg, msg_len, system_from) == -1) {
5002  return ;
5003  }
5004 
5005  switch (message_header->type) {
5007  instance->stats.orf_token_rx++;
5008  break;
5009  case MESSAGE_TYPE_MCAST:
5010  instance->stats.mcast_rx++;
5011  break;
5013  instance->stats.memb_merge_detect_rx++;
5014  break;
5016  instance->stats.memb_join_rx++;
5017  break;
5019  instance->stats.memb_commit_token_rx++;
5020  break;
5022  instance->stats.token_hold_cancel_rx++;
5023  break;
5024  default:
5026  "Message received from %s has wrong type... ignoring %d.\n",
5027  totemip_sa_print((struct sockaddr *)system_from),
5028  (int)message_header->type);
5029 
5030  instance->stats.rx_msg_dropped++;
5031  return;
5032  }
5033  /*
5034  * Handle incoming message
5035  */
5036  totemsrp_message_handlers.handler_functions[(int)message_header->type] (
5037  instance,
5038  msg,
5039  msg_len,
5040  message_header->magic != TOTEM_MH_MAGIC);
5041 }
5042 
5044  void *context,
5045  const struct totem_ip_address *interface_addr,
5046  unsigned short ip_port,
5047  unsigned int iface_no)
5048 {
5049  struct totemsrp_instance *instance = context;
5050  int res;
5051 
5052  totemip_copy(&instance->my_addrs[iface_no], interface_addr);
5053 
5054  res = totemnet_iface_set (
5055  instance->totemnet_context,
5056  interface_addr,
5057  ip_port,
5058  iface_no);
5059 
5060  return (res);
5061 }
5062 
5063 
5065  void *context,
5066  const struct totem_ip_address *iface_addr,
5067  unsigned int iface_no)
5068 {
5069  struct totemsrp_instance *instance = context;
5070  int num_interfaces;
5071  int i;
5072 
5073  if (!instance->my_id.nodeid) {
5074  instance->my_id.nodeid = iface_addr->nodeid;
5075  }
5076  totemip_copy (&instance->my_addrs[iface_no], iface_addr);
5077 
5078  if (instance->iface_changes++ == 0) {
5079  instance->memb_ring_id_create_or_load (&instance->my_ring_id, instance->my_id.nodeid);
5080  instance->token_ring_id_seq = instance->my_ring_id.seq;
5081  log_printf (
5082  instance->totemsrp_log_level_debug,
5083  "Created or loaded sequence id %llx.%u for this ring.",
5084  instance->my_ring_id.seq,
5085  instance->my_ring_id.rep);
5086 
5087  if (instance->totemsrp_service_ready_fn) {
5088  instance->totemsrp_service_ready_fn ();
5089  }
5090 
5091  }
5092 
5093  for (i = 0; i < instance->totem_config->interfaces[iface_no].member_count; i++) {
5094  totemsrp_member_add (instance,
5095  &instance->totem_config->interfaces[iface_no].member_list[i],
5096  iface_no);
5097  }
5098 
5099  num_interfaces = 0;
5100  for (i = 0; i < INTERFACE_MAX; i++) {
5101  if (instance->totem_config->interfaces[i].configured) {
5102  num_interfaces++;
5103  }
5104  }
5105 
5106  if (instance->iface_changes >= num_interfaces) {
5107  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_INTERFACE_CHANGE);
5108  }
5109 }
5110 
5111 void totemsrp_net_mtu_adjust (struct totem_config *totem_config) {
5112  totem_config->net_mtu -= 2 * sizeof (struct mcast);
5113 }
5114 
5116  void *context,
5117  void (*totem_service_ready) (void))
5118 {
5119  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5120 
5121  instance->totemsrp_service_ready_fn = totem_service_ready;
5122 }
5123 
5125  void *context,
5126  const struct totem_ip_address *member,
5127  int iface_no)
5128 {
5129  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5130  int res;
5131 
5132  res = totemnet_member_add (instance->totemnet_context, &instance->my_addrs[iface_no], member, iface_no);
5133 
5134  return (res);
5135 }
5136 
5138  void *context,
5139  const struct totem_ip_address *member,
5140  int iface_no)
5141 {
5142  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5143  int res;
5144 
5145  res = totemnet_member_remove (instance->totemnet_context, member, iface_no);
5146 
5147  return (res);
5148 }
5149 
5150 void totemsrp_threaded_mode_enable (void *context)
5151 {
5152  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5153 
5154  instance->threaded_mode_enabled = 1;
5155 }
5156 
5157 void totemsrp_trans_ack (void *context)
5158 {
5159  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5160 
5161  instance->waiting_trans_ack = 0;
5162  instance->totemsrp_waiting_trans_ack_cb_fn (0);
5163 }
5164 
5165 
5166 int totemsrp_reconfigure (void *context, struct totem_config *totem_config)
5167 {
5168  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5169  int res;
5170 
5171  res = totemnet_reconfigure (instance->totemnet_context, totem_config);
5172  return (res);
5173 }
5174 
5175 void totemsrp_stats_clear (void *context, int flags)
5176 {
5177  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5178 
5179  memset(&instance->stats, 0, sizeof(totemsrp_stats_t));
5180  if (flags & TOTEMPG_STATS_CLEAR_TRANSPORT) {
5182  }
5183 }
5184 
5185 void totemsrp_force_gather (void *context)
5186 {
5187  timer_function_orf_token_timeout(context);
5188 }
void(* totemsrp_service_ready_fn)(void)
Definition: totemsrp.c:464
unsigned int backlog
Definition: totemsrp.c:204
void(* totemsrp_deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required)
Definition: totemsrp.c:451
void(*) enum memb_stat memb_state)
Definition: totemsrp.c:443
unsigned short family
Definition: coroapi.h:113
int totemsrp_reconfigure(void *context, struct totem_config *totem_config)
Definition: totemsrp.c:5166
struct totem_message_header header
Definition: totemsrp.c:181
int totemsrp_iface_set(void *context, const struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
Definition: totemsrp.c:5043
gather_state_from
Definition: totemsrp.c:539
void main_iface_change_fn(void *context, const struct totem_ip_address *iface_address, unsigned int iface_no)
Definition: totemsrp.c:5064
struct srp_addr system_from
Definition: totemsrp.c:214
unsigned int nodeid
Definition: totem.h:129
struct memb_ring_id ring_id
Definition: totemsrp.c:192
int totemnet_mcast_flush_send(void *net_context, const void *msg, unsigned int msg_len)
Definition: totemnet.c:410
uint32_t waiting_trans_ack
Definition: totemsrp.c:521
struct srp_addr system_from
Definition: totemsrp.c:182
struct memb_ring_id ring_id
Definition: totemsrp.c:251
int totemsrp_log_level_debug
Definition: totemsrp.c:429
struct memb_ring_id my_ring_id
Definition: totemsrp.c:337
Totem Single Ring Protocol.
uint64_t memb_commit_token_rx
Definition: totemstats.h:65
qb_loop_timer_handle timer_orf_token_warning
Definition: totemsrp.c:402
int my_leave_memb_entries
Definition: totemsrp.c:335
unsigned int old_ring_state_high_seq_received
Definition: totemsrp.c:491
unsigned int proc_list_entries
Definition: totemsrp.c:215
uint32_t value
struct totem_interface * interfaces
Definition: totem.h:158
int totemsrp_my_family_get(void *srp_context)
Definition: totemsrp.c:1112
struct qb_list_head list
Definition: totemsrp.c:167
void(* memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition: totemsrp.c:469
void(* totemsrp_confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
Definition: totemsrp.c:457
The totem_ip_address struct.
Definition: coroapi.h:111
unsigned int seq
Definition: totemsrp.c:262
totemsrp_token_stats_t token[TOTEM_TOKEN_STATS_MAX]
Definition: totemstats.h:90
int totemsrp_log_level_error
Definition: totemsrp.c:423
int old_ring_state_aru
Definition: totemsrp.c:489
#define LEAVE_DUMMY_NODEID
Definition: totemsrp.c:99
unsigned int seq
Definition: totemsrp.c:199
struct memb_ring_id ring_id
Definition: totemsrp.c:241
int fcc_remcast_current
Definition: totemsrp.c:293
#define TOTEM_MH_MAGIC
Definition: totem.h:121
qb_loop_timer_handle timer_heartbeat_timeout
Definition: totemsrp.c:416
unsigned int failed_list_entries
Definition: totemsrp.c:216
unsigned char end_of_memb_join[0]
Definition: totemsrp.c:265
uint64_t mcast_rx
Definition: totemstats.h:63
unsigned long long int tv_old
Definition: totemsrp.c:3814
int totemsrp_member_add(void *context, const struct totem_ip_address *member, int iface_no)
Definition: totemsrp.c:5124
#define SEQNO_START_TOKEN
Definition: totemsrp.c:119
void totemnet_stats_clear(void *net_context)
Definition: totemnet.c:574
unsigned int token_hold_timeout
Definition: totem.h:180
unsigned int msg_len
Definition: totemsrp.c:266
int member_count
Definition: totem.h:88
struct memb_ring_id ring_id
Definition: totemsrp.c:203
struct totem_ip_address member_list[PROCESSOR_COUNT_MAX]
Definition: totem.h:95
void * token_sent_event_handle
Definition: totemsrp.c:526
int retrans_flg
Definition: totemsrp.c:206
struct srp_addr system_from
Definition: totemsrp.c:229
int my_new_memb_entries
Definition: totemsrp.c:325
totem_configuration_type
The totem_configuration_type enum.
Definition: coroapi.h:132
int addr_entries
Definition: totemsrp.c:265
int totemsrp_log_level_notice
Definition: totemsrp.c:427
unsigned int proc_list_entries
Definition: totemsrp.c:262
unsigned int totemsrp_my_nodeid_get(void *srp_context)
Definition: totemsrp.c:1101
void(* totem_memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition: totem.h:236
unsigned int my_pbl
Definition: totemsrp.c:505
int totemnet_member_remove(void *net_context, const struct totem_ip_address *member, int ring_no)
Definition: totemnet.c:524
#define TOTEM_TOKEN_STATS_MAX
Definition: totemstats.h:89
void totemsrp_net_mtu_adjust(struct totem_config *totem_config)
Definition: totemsrp.c:5111
int totemsrp_log_level_warning
Definition: totemsrp.c:425
int totemsrp_crypto_set(void *srp_context, const char *cipher_type, const char *hash_type)
Definition: totemsrp.c:1087
unsigned int my_aru
Definition: totemsrp.c:381
int totemsrp_member_remove(void *context, const struct totem_ip_address *member, int iface_no)
Definition: totemsrp.c:5137
uint64_t memb_merge_detect_rx
Definition: totemstats.h:58
int guarantee
Definition: totemsrp.c:266
struct cs_queue new_message_queue_trans
Definition: totemsrp.c:370
unsigned char end_of_commit_token[0]
Definition: totemsrp.c:255
unsigned int seq
Definition: totemsrp.c:183
unsigned char addr[TOTEMIP_ADDRLEN]
Definition: coroapi.h:77
char commit_token_storage[40000]
Definition: totemsrp.c:527
The sq struct.
Definition: sq.h:43
unsigned int set_aru
Definition: totemsrp.c:485
struct cs_queue new_message_queue
Definition: totemsrp.c:368
int my_rotation_counter
Definition: totemsrp.c:355
struct srp_addr my_deliver_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:315
uint64_t orf_token_tx
Definition: totemstats.h:55
void totemsrp_callback_token_destroy(void *srp_context, void **handle_out)
Definition: totemsrp.c:3492
uint64_t gather_token_lost
Definition: totemstats.h:71
int totemsrp_log_level_trace
Definition: totemsrp.c:431
void totemip_copy(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
Definition: totemip.c:96
struct memb_ring_id my_old_ring_id
Definition: totemsrp.c:339
memb_state
Definition: totemsrp.c:274
unsigned int downcheck_timeout
Definition: totem.h:192
struct qb_list_head token_callback_received_listhead
Definition: totemsrp.c:385
struct srp_addr my_new_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:309
#define TOKEN_SIZE_MAX
Definition: totemsrp.c:98
uint64_t memb_commit_token_tx
Definition: totemstats.h:64
int my_deliver_memb_entries
Definition: totemsrp.c:331
unsigned int max_network_delay
Definition: totem.h:208
unsigned int heartbeat_failures_allowed
Definition: totem.h:206
unsigned int my_last_seq
Definition: totemsrp.c:493
int my_left_memb_entries
Definition: totemsrp.c:333
#define swab64(x)
The swab64 macro.
Definition: swab.h:65
struct message_item __attribute__
unsigned long long token_ring_id_seq
Definition: totemsrp.c:481
struct totem_ip_address mcast_address
Definition: totemsrp.c:449
int totemsrp_callback_token_create(void *srp_context, void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
Definition: totemsrp.c:3457
unsigned int send_join_timeout
Definition: totem.h:186
unsigned int window_size
Definition: totem.h:210
int guarantee
Definition: totemsrp.c:187
unsigned int seq
Definition: totemsrp.c:193
void totemsrp_service_ready_register(void *context, void(*totem_service_ready)(void))
Definition: totemsrp.c:5115
struct mcast * mcast
Definition: totemsrp.c:270
struct srp_addr my_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:313
uint64_t operational_entered
Definition: totemstats.h:68
void(*) in log_level_security)
Definition: totem.h:106
unsigned long long ring_seq
Definition: totemsrp.c:217
#define INTERFACE_MAX
Definition: coroapi.h:88
int totemsrp_mcast(void *srp_context, struct iovec *iovec, unsigned int iov_len, int guarantee)
Multicast a message.
Definition: totemsrp.c:2459
message_type
Definition: totemsrp.c:143
uint64_t operational_token_lost
Definition: totemstats.h:69
unsigned int received_flg
Definition: totemsrp.c:263
uint64_t consensus_timeouts
Definition: totemstats.h:76
unsigned int aru_addr
Definition: totemsrp.c:202
Totem Network interface - also does encryption/decryption.
unsigned int my_high_delivered
Definition: totemsrp.c:383
struct message_handlers totemsrp_message_handlers
Definition: totemsrp.c:677
qb_loop_timer_handle memb_timer_state_gather_consensus_timeout
Definition: totemsrp.c:412
uint64_t recovery_token_lost
Definition: totemstats.h:75
int totemnet_recv_flush(void *net_context)
Definition: totemnet.c:378
int totemnet_token_target_set(void *net_context, unsigned int nodeid)
Definition: totemnet.c:481
int totemnet_ifaces_get(void *net_context, char ***status, unsigned int *iface_count)
Definition: totemnet.c:468
unsigned int backlog
Definition: totemsrp.c:266
int this_seqno
Definition: totemsrp.c:184
unsigned char end_of_memb_join[0]
Definition: totemsrp.c:218
unsigned int token_retransmits_before_loss_const
Definition: totem.h:182
uint8_t configured
Definition: totem.h:87
totemsrp_stats_t * srp
Definition: totemstats.h:96
struct rtr_item rtr_list[0]
Definition: totemsrp.c:270
unsigned int retrans_flg
Definition: totemsrp.c:252
struct memb_ring_id ring_id
Definition: totemsrp.c:185
unsigned int seqno_unchanged_const
Definition: totem.h:196
uint64_t commit_token_lost
Definition: totemstats.h:73
unsigned int miss_count_const
Definition: totem.h:232
uint64_t token_hold_cancel_rx
Definition: totemstats.h:67
int totemnet_crypto_set(void *net_context, const char *cipher_type, const char *hash_type)
Definition: totemnet.c:276
unsigned int join_timeout
Definition: totem.h:184
unsigned int aru
Definition: totemsrp.c:242
uint32_t originated_orf_token
Definition: totemsrp.c:517
unsigned int rep
Definition: totem.h:148
void * totemnet_buffer_alloc(void *net_context)
Definition: totemnet.c:351
unsigned int nodeid
Definition: coroapi.h:112
uint32_t flags
uint64_t pause_timestamp
Definition: totemsrp.c:509
int my_set_retrans_flg
Definition: totemsrp.c:357
struct totem_ip_address mcast_addr
Definition: totem.h:84
#define MESSAGE_QUEUE_MAX
Definition: coroapi.h:98
int totemnet_recv_mcast_empty(void *net_context)
Definition: totemnet.c:493
unsigned int received_flg
Definition: totemsrp.c:244
unsigned int my_cbl
Definition: totemsrp.c:507
unsigned int last_released
Definition: totemsrp.c:483
int orf_token_retransmit_size
Definition: totemsrp.c:391
int totemsrp_avail(void *srp_context)
Return number of available messages that can be queued.
Definition: totemsrp.c:2530
uint64_t mcast_retx
Definition: totemstats.h:62
int totemnet_initialize(qb_loop_t *loop_pt, void **net_context, struct totem_config *totem_config, totemsrp_stats_t *stats, void *context, void(*deliver_fn)(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from), void(*iface_change_fn)(void *context, const struct totem_ip_address *iface_address, unsigned int ring_no), void(*mtu_changed)(void *context, int net_mtu), void(*target_set_completed)(void *context))
Definition: totemnet.c:301
unsigned int msg_len
Definition: totemsrp.c:271
#define RETRANS_MESSAGE_QUEUE_SIZE_MAX
Definition: totemsrp.c:94
#define LOGSYS_LEVEL_DEBUG
Definition: logsys.h:76
unsigned int fail_to_recv_const
Definition: totem.h:194
int totemnet_send_flush(void *net_context)
Definition: totemnet.c:388
#define TOTEM_MH_VERSION
Definition: totem.h:122
unsigned int token_seq
Definition: totemsrp.c:200
void totemsrp_stats_clear(void *context, int flags)
Definition: totemsrp.c:5175
int totemnet_mcast_noflush_send(void *net_context, const void *msg, unsigned int msg_len)
Definition: totemnet.c:423
struct mcast * mcast
Definition: totemsrp.c:265
void * token_recv_event_handle
Definition: totemsrp.c:525
struct totem_ip_address boundto
Definition: totem.h:83
unsigned int my_high_seq_received
Definition: totemsrp.c:351
qb_loop_t * totemsrp_poll_handle
Definition: totemsrp.c:447
int totemnet_finalize(void *net_context)
Definition: totemnet.c:290
totem_event_type
Definition: totem.h:257
qb_loop_timer_handle timer_pause_timeout
Definition: totemsrp.c:398
qb_loop_timer_handle timer_merge_detect_timeout
Definition: totemsrp.c:408
int old_ring_state_saved
Definition: totemsrp.c:487
int my_merge_detect_timeout_outstanding
Definition: totemsrp.c:343
uint64_t rx_msg_dropped
Definition: totemstats.h:77
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
Definition: totem.h:99
int totemsrp_log_level_security
Definition: totemsrp.c:421
qb_loop_timer_handle timer_orf_token_retransmit_timeout
Definition: totemsrp.c:404
struct totem_config * totem_config
Definition: totemsrp.c:499
int(* callback_fn)(enum totem_callback_token_type type, const void *)
Definition: totemsrp.c:168
struct totem_message_header header
Definition: totemsrp.c:260
#define swab32(x)
The swab32 macro.
Definition: swab.h:51
qb_loop_timer_handle timer_orf_token_timeout
Definition: totemsrp.c:400
uint32_t continuous_gather
Definition: totemstats.h:78
void main_deliver_fn(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from)
Definition: totemsrp.c:4992
void totemsrp_threaded_mode_enable(void *context)
Definition: totemsrp.c:5150
int totemnet_token_send(void *net_context, const void *msg, unsigned int msg_len)
Definition: totemnet.c:398
unsigned int aru
Definition: totemsrp.c:263
encapsulation_type
Definition: totemsrp.c:152
unsigned int net_mtu
Definition: totem.h:202
int totemsrp_ifaces_get(void *srp_context, unsigned int nodeid, unsigned int *interface_id, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
Definition: totemsrp.c:1049
int totemsrp_initialize(qb_loop_t *poll_handle, void **srp_context, struct totem_config *totem_config, totempg_stats_t *stats, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id), void(*waiting_trans_ack_cb_fn)(int waiting_trans_ack))
Create a protocol instance.
Definition: totemsrp.c:818
struct totem_message_header header
Definition: totemsrp.c:249
void totemsrp_event_signal(void *srp_context, enum totem_event_type type, int value)
Definition: totemsrp.c:2450
void(* totem_memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition: totem.h:240
unsigned int node_id
Definition: totemsrp.c:186
uint32_t orf_token_discard
Definition: totemsrp.c:515
int my_failed_list_entries
Definition: totemsrp.c:323
struct srp_addr my_id
Definition: totemsrp.c:301
struct srp_addr my_left_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:317
uint64_t token_hold_cancel_tx
Definition: totemstats.h:66
const char * totemip_sa_print(const struct sockaddr *sa)
Definition: totemip.c:215
unsigned int token_timeout
Definition: totem.h:174
Definition: totemsrp.c:240
struct totem_message_header header
Definition: totemsrp.c:198
unsigned int high_delivered
Definition: totemsrp.c:243
unsigned int consensus_timeout
Definition: totem.h:188
totemsrp_stats_t stats
Definition: totemsrp.c:513
void totemsrp_force_gather(void *context)
Definition: totemsrp.c:5185
#define PROCESSOR_COUNT_MAX
Definition: coroapi.h:96
struct totem_ip_address my_addrs[INTERFACE_MAX]
Definition: totemsrp.c:303
uint64_t mcast_tx
Definition: totemstats.h:61
char orf_token_retransmit[TOKEN_SIZE_MAX]
Definition: totemsrp.c:389
unsigned int token_warning
Definition: totem.h:176
struct sq regular_sort_queue
Definition: totemsrp.c:374
int my_retrans_flg_count
Definition: totemsrp.c:359
The memb_ring_id struct.
Definition: coroapi.h:122
#define SEQNO_START_MSG
Definition: totemsrp.c:118
#define swab16(x)
The swab16 macro.
Definition: swab.h:39
struct totem_message_header header
Definition: totemsrp.c:235
void totemsrp_finalize(void *srp_context)
Definition: totemsrp.c:1026
struct totem_message_header header
Definition: totemsrp.c:228
#define QUEUE_RTR_ITEMS_SIZE_MAX
Definition: totemsrp.c:93
struct srp_addr my_failed_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:307
struct cs_queue retrans_message_queue
Definition: totemsrp.c:372
unsigned int aru
Definition: totemsrp.c:201
const char * gather_state_from_desc[]
Definition: totemsrp.c:559
qb_loop_timer_handle memb_timer_state_gather_join_timeout
Definition: totemsrp.c:410
int my_trans_memb_entries
Definition: totemsrp.c:327
unsigned int my_trc
Definition: totemsrp.c:503
void(* totemsrp_waiting_trans_ack_cb_fn)(int waiting_trans_ack)
Definition: totemsrp.c:466
uint64_t memb_merge_detect_tx
Definition: totemstats.h:57
unsigned int high_delivered
Definition: totemsrp.c:262
struct rtr_item rtr_list[0]
Definition: totemsrp.c:208
int consensus_list_entries
Definition: totemsrp.c:297
uint64_t memb_join_rx
Definition: totemstats.h:60
int totemnet_member_add(void *net_context, const struct totem_ip_address *local, const struct totem_ip_address *member, int ring_no)
Definition: totemnet.c:504
unsigned char end_of_commit_token[0]
Definition: totemsrp.c:266
#define FRAME_SIZE_MAX
Definition: totem.h:52
int rtr_list_entries
Definition: totemsrp.c:207
uint32_t threaded_mode_enabled
Definition: totemsrp.c:519
enum totem_callback_token_type callback_type
Definition: totemsrp.c:169
int my_proc_list_entries
Definition: totemsrp.c:321
unsigned long long ring_seq
Definition: totemsrp.c:264
struct totem_message_header header
Definition: totemsrp.c:213
struct totem_logging_configuration totem_logging_configuration
Definition: totem.h:200
struct memb_ring_id ring_id
Definition: totemsrp.c:236
#define log_printf(level, format, args...)
Definition: totemsrp.c:689
unsigned long long seq
Definition: coroapi.h:124
void totemsrp_trans_ack(void *context)
Definition: totemsrp.c:5157
unsigned int max_messages
Definition: totem.h:212
uint64_t recovery_entered
Definition: totemstats.h:74
qb_loop_timer_handle memb_timer_state_commit_timeout
Definition: totemsrp.c:414
void totemnet_buffer_release(void *net_context, void *ptr)
Definition: totemnet.c:359
int totemnet_reconfigure(void *net_context, struct totem_config *totem_config)
Definition: totemnet.c:560
struct memb_commit_token * commit_token
Definition: totemsrp.c:511
struct consensus_list_item consensus_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:295
struct srp_addr system_from
Definition: totemsrp.c:261
struct srp_addr addr
Definition: totemsrp.c:161
char type
Definition: totem.h:55
struct srp_addr my_proc_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:305
void(* memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition: totemsrp.c:473
int(* handler_functions[6])(struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed)
Definition: totemsrp.c:532
int totemsrp_subsys_id
Definition: totemsrp.c:433
unsigned int merge_timeout
Definition: totem.h:190
unsigned int use_heartbeat
Definition: totemsrp.c:501
int totemnet_iface_check(void *net_context)
Definition: totemnet.c:436
unsigned int token_retransmit_timeout
Definition: totem.h:178
struct qb_list_head token_callback_sent_listhead
Definition: totemsrp.c:387
int rtr_list_entries
Definition: totemsrp.c:269
struct srp_addr my_trans_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:311
#define RETRANSMIT_ENTRIES_MAX
Definition: totemsrp.c:97
unsigned short magic
Definition: totem.h:125
void(* totemsrp_log_printf)(int level, int sybsys, const char *function, const char *file, int line, const char *format,...) __attribute__((format(printf
Definition: totemsrp.c:435
unsigned int token_seq
Definition: totemsrp.c:250
unsigned int my_token_seq
Definition: totemsrp.c:393
struct memb_ring_id ring_id
Definition: totemsrp.c:264
unsigned int my_last_aru
Definition: totemsrp.c:345
unsigned int my_leave_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:319
unsigned int nodeid
Definition: totemsrp.c:105
void * totemnet_context
Definition: totemsrp.c:497
uint64_t commit_entered
Definition: totemstats.h:72
qb_loop_timer_handle timer_orf_token_hold_retransmit_timeout
Definition: totemsrp.c:406
int totemnet_iface_set(void *net_context, const struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
Definition: totemnet.c:455
struct memb_ring_id ring_id
Definition: totemsrp.c:230
unsigned int my_install_seq
Definition: totemsrp.c:353
uint64_t orf_token_rx
Definition: totemstats.h:56
unsigned int threads
Definition: totem.h:204
unsigned int failed_list_entries
Definition: totemsrp.c:263
struct sq recovery_sort_queue
Definition: totemsrp.c:376
#define TOTEMPG_STATS_CLEAR_TRANSPORT
Definition: totemstats.h:116
totem_callback_token_type
The totem_callback_token_type enum.
Definition: coroapi.h:142
unsigned int my_high_ring_delivered
Definition: totemsrp.c:361
int totemnet_processor_count_set(void *net_context, int processor_count)
Definition: totemnet.c:367
unsigned int fcc
Definition: totemsrp.c:205