gnunet

Main GNUnet Logic
Log | Files | Refs | Submodules | README | LICENSE

commit c81d7e1199cf2779973900677ffb3d12e12ca96f
parent 4f47b28bace8908afbd9d2b5d4093005931d298d
Author: Florian Dold <florian.dold@gmail.com>
Date:   Tue,  6 Oct 2015 15:22:02 +0000

towards handling byz. faults correctly

- blacklisting of peers in consensus
- restructuring


Diffstat:
Msrc/consensus/gnunet-service-consensus.c | 278+++++++++++++++++++++++++++++++++++++++++++++++++++----------------------------
1 file changed, 179 insertions(+), 99 deletions(-)

diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c @@ -34,9 +34,34 @@ #include "consensus.h" +#define ELEMENT_TYPE_CONTESTED_MARKER (GNUNET_CONSENSUS_ELEMENT_TYPE_USER_MAX + 1) + + +enum ReferendumVote +{ + /** + * Vote that nothing should change. + * This option is never voted explicitly. + */ + VOTE_STAY = 0, + /** + * Vote that an element should be added. + */ + VOTE_ADD = 1, + /** + * Vote that an element should be removed. + */ + VOTE_REMOVE = 2, +}; + GNUNET_NETWORK_STRUCT_BEGIN + +struct ContestedPayload +{ +}; + /** * Tuple of integers that together * identify a task uniquely. @@ -72,23 +97,6 @@ struct TaskKey { }; -enum ReferendumVote -{ - /** - * Vote that nothing should change. - * This option is never voted explicitly. - */ - VOTE_STAY = 0, - /** - * Vote that an element should be added. - */ - VOTE_ADD = 1, - /** - * Vote that an element should be removed. - */ - VOTE_REMOVE = 2, -}; - struct SetKey { @@ -136,7 +144,6 @@ enum PhaseKind PHASE_KIND_GRADECAST_ECHO_GRADE, PHASE_KIND_GRADECAST_CONFIRM, PHASE_KIND_GRADECAST_CONFIRM_GRADE, - PHASE_KIND_GRADECAST_APPLY_RESULT, /** * Apply a repetition of the all-to-all * gradecast to the current set. @@ -484,9 +491,6 @@ static void finish_task (struct TaskEntry *task); static void -task_start_reconcile (struct TaskEntry *task); - -static void run_ready_steps (struct ConsensusSession *session); static const char * @@ -501,7 +505,6 @@ phasename (uint16_t phase) case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE"; case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM"; case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE"; - case PHASE_KIND_GRADECAST_APPLY_RESULT: return "GRADECAST_APPLY_RESULT"; case PHASE_KIND_APPLY_REP: return "APPLY_REP"; default: return "(unknown)"; } @@ -772,24 +775,39 @@ diff_insert (struct DiffEntry *diff, static void +rfn_commit (struct ReferendumEntry *rfn, + uint16_t commit_peer) +{ + GNUNET_assert (commit_peer < rfn->num_peers); + + rfn->peer_commited[commit_peer] = GNUNET_YES; +} + + +static void +rfn_contest (struct ReferendumEntry *rfn, + uint16_t contested_peer) +{ + GNUNET_assert (contested_peer < rfn->num_peers); + + rfn->peer_contested[contested_peer] = GNUNET_YES; +} + +static void rfn_vote (struct ReferendumEntry *rfn, uint16_t voting_peer, - uint16_t num_peers, enum ReferendumVote vote, const struct GNUNET_SET_Element *element) { struct RfnElementInfo *ri; struct GNUNET_HashCode hash; - GNUNET_assert (voting_peer < num_peers); + GNUNET_assert (voting_peer < rfn->num_peers); /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE, since VOTE_KEEP is implicit in not voting. */ GNUNET_assert ( (VOTE_ADD == vote) || (VOTE_REMOVE == vote) ); - // XXX: should happen in another place! - rfn->peer_commited[voting_peer] = GNUNET_YES; - GNUNET_SET_element_hash (element, &hash); ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash); @@ -797,7 +815,7 @@ rfn_vote (struct ReferendumEntry *rfn, { ri = GNUNET_new (struct RfnElementInfo); ri->element = GNUNET_SET_element_dup (element); - ri->votes = GNUNET_new_array (num_peers, int); + ri->votes = GNUNET_new_array (rfn->num_peers, int); GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements, &hash, ri, @@ -897,6 +915,16 @@ set_result_cb (void *cls, return; } + if ( (GNUNET_SET_STATUS_ADD_LOCAL == status) || (GNUNET_SET_STATUS_ADD_REMOTE == status) ) + { + if ( (GNUNET_YES == setop->transceive_contested) && (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type) ) + { + GNUNET_assert (NULL != output_rfn); + rfn_contest (output_rfn, task_other_peer (task)); + return; + } + } + switch (status) { case GNUNET_SET_STATUS_ADD_LOCAL: @@ -933,7 +961,7 @@ set_result_cb (void *cls, } if (NULL != output_rfn) { - rfn_vote (output_rfn, task_other_peer (task), session->num_peers, VOTE_ADD, element); + rfn_vote (output_rfn, task_other_peer (task), VOTE_ADD, element); #ifdef GNUNET_EXTRA_LOGGING GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: adding element %s into rfn {%s} of task {%s}\n", @@ -948,6 +976,8 @@ set_result_cb (void *cls, case GNUNET_SET_STATUS_ADD_REMOTE: if (GNUNET_YES == setop->do_not_remove) break; + if (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type) + break; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Removing element in Task {%s}\n", debug_str_task_key (&task->key)); @@ -981,7 +1011,7 @@ set_result_cb (void *cls, } if (NULL != output_rfn) { - rfn_vote (output_rfn, task_other_peer (task), session->num_peers, VOTE_REMOVE, element); + rfn_vote (output_rfn, task_other_peer (task), VOTE_REMOVE, element); #ifdef GNUNET_EXTRA_LOGGING GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: removing element %s from rfn {%s} of task {%s}\n", @@ -995,10 +1025,13 @@ set_result_cb (void *cls, case GNUNET_SET_STATUS_DONE: // XXX: check first if any changes to the underlying // set are still pending - // XXX: commit other peer in referendum GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Finishing setop in Task {%s}\n", debug_str_task_key (&task->key)); + if (NULL != output_rfn) + { + rfn_commit (output_rfn, task_other_peer (task)); + } finish_task (task); break; case GNUNET_SET_STATUS_FAILURE: @@ -1159,6 +1192,15 @@ commit_set (struct ConsensusSession *session, } } #else + if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) ) + { + struct GNUNET_SET_Element element; + struct ContestedPayload payload; + element.data = &payload; + element.size = sizeof (struct ContestedPayload); + element.element_type = ELEMENT_TYPE_CONTESTED_MARKER; + GNUNET_SET_add_element (set->h, &element, NULL, NULL); + } GNUNET_SET_commit (setop->op, set->h); #endif } @@ -1212,25 +1254,6 @@ put_rfn (struct ConsensusSession *session, static void -output_cloned_cb (void *cls, struct GNUNET_SET_Handle *copy) -{ - struct TaskEntry *task = (struct TaskEntry *) cls; - struct SetOpCls *setop = &task->cls.setop; - struct ConsensusSession *session = task->step->session; - struct SetEntry *set = GNUNET_new (struct SetEntry); - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "P%u: Received lazy copy, storing output set %s\n", - session->local_peer_idx, debug_str_set_key (&setop->output_set)); - - set->key = setop->output_set; - set->h = copy; - put_set (task->step->session, set); - task_start_reconcile (task); -} - - -static void task_cancel_reconcile (struct TaskEntry *task) { /* not implemented yet */ @@ -1249,15 +1272,18 @@ apply_diff_to_rfn (struct DiffEntry *diff, iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes); - while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di)) + while (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_iterator_next (iter, + NULL, + (const void **) &di)) { if (di->weight > 0) { - rfn_vote (rfn, voting_peer, num_peers, VOTE_ADD, di->element); + rfn_vote (rfn, voting_peer, VOTE_ADD, di->element); } if (di->weight < 0) { - rfn_vote (rfn, voting_peer, num_peers, VOTE_REMOVE, di->element); + rfn_vote (rfn, voting_peer, VOTE_REMOVE, di->element); } } @@ -1401,6 +1427,12 @@ create_set_copy_for_task (struct TaskEntry *task, struct SetEntry *src_set; struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Copying set {%s} to {%s} for task {%s}\n", + debug_str_set_key (src_set_key), + debug_str_set_key (dst_set_key), + debug_str_task_key (&task->key)); + scc->task = task; scc->dst_set_key = *dst_set_key; src_set = lookup_set (task->step->session, src_set_key); @@ -1410,6 +1442,34 @@ create_set_copy_for_task (struct TaskEntry *task, scc); } + +struct SetMutationProgressCls +{ + int num_pending; + /** + * Task to finish once all changes are through. + */ + struct TaskEntry *task; +}; + + +static void +set_mutation_done (void *cls) +{ + struct SetMutationProgressCls *pc = cls; + + GNUNET_assert (pc->num_pending > 0); + + pc->num_pending--; + + if (0 == pc->num_pending) + { + struct TaskEntry *task = pc->task; + GNUNET_free (pc); + finish_task (task); + } +} + static void task_start_apply_round (struct TaskEntry *task) { @@ -1421,6 +1481,7 @@ task_start_apply_round (struct TaskEntry *task) struct ReferendumEntry *rfn_in; struct GNUNET_CONTAINER_MultiHashMapIterator *iter; struct RfnElementInfo *ri; + struct SetMutationProgressCls *progress_cls; sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition }; rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition }; @@ -1436,6 +1497,9 @@ task_start_apply_round (struct TaskEntry *task) rfn_in = lookup_rfn (session, &rk_in); GNUNET_assert (NULL != rfn_in); + progress_cls = GNUNET_new (struct SetMutationProgressCls); + progress_cls->task = task; + iter = GNUNET_CONTAINER_multihashmap_iterator_create (rfn_in->rfn_elements); while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri)) @@ -1448,10 +1512,20 @@ task_start_apply_round (struct TaskEntry *task) switch (majority_vote) { case VOTE_ADD: - // XXX: add to set + progress_cls->num_pending++; + GNUNET_assert (GNUNET_OK == + GNUNET_SET_add_element (set_out->h, + ri->element, + set_mutation_done, + progress_cls)); break; case VOTE_REMOVE: - // XXX: remove from set + progress_cls->num_pending++; + GNUNET_assert (GNUNET_OK == + GNUNET_SET_remove_element (set_out->h, + ri->element, + set_mutation_done, + progress_cls)); break; case VOTE_STAY: // do nothing @@ -1461,9 +1535,19 @@ task_start_apply_round (struct TaskEntry *task) break; } } + + if (progress_cls->num_pending == 0) + { + // call closure right now, no pending ops + GNUNET_free (progress_cls); + finish_task (task); + } } +#define THRESH(s) (((s)->num_peers / 3)) + + static void task_start_grade (struct TaskEntry *task) { @@ -1475,12 +1559,14 @@ task_start_grade (struct TaskEntry *task) struct DiffKey diff_key; struct GNUNET_CONTAINER_MultiHashMapIterator *iter; struct RfnElementInfo *ri; + unsigned int gradecast_confidence = 2; rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition }; output_rfn = lookup_rfn (session, &rfn_key); if (NULL == output_rfn) { output_rfn = rfn_create (session->num_peers); + output_rfn->key = rfn_key; put_rfn (session, output_rfn); } @@ -1492,26 +1578,50 @@ task_start_grade (struct TaskEntry *task) input_rfn = lookup_rfn (session, &rfn_key); GNUNET_assert (NULL != input_rfn); - apply_diff_to_rfn (input_diff, output_rfn, task->key.leader, session->num_peers); - iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements); + apply_diff_to_rfn (input_diff, output_rfn, task->key.leader, session->num_peers); + while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri)) { uint16_t majority_num; enum ReferendumVote majority_vote; + // XXX: we need contested votes and non-contested votes here rfn_majority (input_rfn, ri, &majority_num, &majority_vote); - + if (majority_num < (session->num_peers / 3) * 2) + { + gradecast_confidence = GNUNET_MIN(1, gradecast_confidence); + } + if (majority_num < (session->num_peers / 3) + 1) + { + gradecast_confidence = 0; + } switch (majority_vote) { + case VOTE_STAY: + break; + case VOTE_ADD: + rfn_vote (output_rfn, task->key.leader, VOTE_ADD, ri->element); + break; + case VOTE_REMOVE: + rfn_vote (output_rfn, task->key.leader, VOTE_REMOVE, ri->element); + break; default: GNUNET_assert (0); break; } } + + if (gradecast_confidence >= 1) + rfn_commit (output_rfn, task->key.leader); + + if (gradecast_confidence <= 1) + session->peers_blacklisted[task->key.leader] = GNUNET_YES; + + finish_task (task); } @@ -1537,12 +1647,7 @@ task_start_reconcile (struct TaskEntry *task) we clone the input set. */ if (NULL == lookup_set (session, &setop->output_set)) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Output set missing, copying from input set\n"); - /* Since the cloning is asynchronous, - we'll retry the current function once the copy - has been provided by the SET service. */ - GNUNET_SET_copy_lazy (input->h, output_cloned_cb, task); + create_set_copy_for_task (task, &setop->input_set, &setop->output_set); return; } } @@ -1640,37 +1745,6 @@ task_start_reconcile (struct TaskEntry *task) } - - -struct SetMutationProgressCls -{ - int num_pending; - /** - * Task to finish once all changes are through. - */ - struct TaskEntry *task; -}; - - -static void -set_mutation_done (void *cls) -{ - struct SetMutationProgressCls *pc = cls; - - GNUNET_assert (pc->num_pending > 0); - - pc->num_pending--; - - if (0 == pc->num_pending) - { - struct TaskEntry *task = pc->task; - GNUNET_free (pc); - finish_task (task); - } -} - - - static void task_start_eval_echo (struct TaskEntry *task) { @@ -1708,7 +1782,10 @@ task_start_eval_echo (struct TaskEntry *task) iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements); GNUNET_assert (NULL != iter); - while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri)) + while (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_iterator_next (iter, + NULL, + (const void **) &ri)) { enum ReferendumVote majority_vote; uint16_t majority_num; @@ -1717,7 +1794,12 @@ task_start_eval_echo (struct TaskEntry *task) if (majority_num < session->num_peers / 3) { - majority_vote = VOTE_REMOVE; + /* It is not the case that all nonfaulty peers + echoed the same value. Since we're doing a set reconciliation, we + can't simply send "nothing" for the value. Thus we mark our 'confirm' + reconciliation as contested. Other peers might not know that the + leader is faulty, thus we still re-distribute in the confirmation + round. */ output_set->is_contested = GNUNET_YES; } @@ -2509,8 +2591,6 @@ construct_task_graph (struct ConsensusSession *session) for (lead = 0; lead < n; lead++) construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end); - // TODO: add peers to blacklisted list, either here or - // already in the gradecast. task = ((struct TaskEntry) { .step = step_rep_end, .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1},