Differences between revisions 2 and 35 (spanning 33 versions)
Revision 2 as of 2015-02-09 21:46:32
Size: 6194
Editor: lukisi
Comment:
Revision 35 as of 2015-09-25 09:28:45
Size: 22603
Editor: lukisi
Comment:
Deletions are marked like this. Additions are marked like this.
Line 1: Line 1:
= Modulo PeerServices - Appunti - Algoritmi principali =
=
== Algoritmo di rilevamento non partecipazione ===
====
bool check_non_participation(p_id, lvl, pos) ====
= Modulo PeerServices - Appunti - Algoritmi 2 / 2 =
<<TableOfContents(4)>>

== Algoritmo di rilevamento di non partecipazione ==
=== check_non_participation ===
'''
bool check_non_participation(p_id, lvl, _pos)'''
Line 5: Line 8:
 * Produci ''x̄'' = la tupla x̄~-,,0,,-~·x̄~-,,1,,-~·...·x̄~-,,lvl-1,,-~ dove x̄~-,,i,,-~ = 0 per ogni i. La tupla identifica un indirizzo a caso all'interno del g-nodo ''g'' = (lvl, pos). Se lvl = 0 allora x̄ è null.
 * Produci ''n'' = la tupla n~-,,0,,-~·n~-,,1,,-~·...·n~-,,lvl,,-~. La tupla che identifica il nodo corrente nel g-nodo di livello lvl+1 in cui il messaggio si muoverà.
 * Produci ''x̄'' = la tupla x̄~-,,0,,-~·x̄~-,,1,,-~·...·x̄~-,,lvl-1,,-~ dove x̄~-,,i,,-~ = 0 per ogni i. La tupla identifica un indirizzo a caso all'interno del g-nodo ''g'' = (lvl, _pos). Se lvl = 0 allora x̄ è null.
 * Produci ''n'' = make_tuple_node(new HCoord(0, pos[0]), lvl+1) , cioè la tupla n~-,,0,,-~·n~-,,1,,-~·...·n~-,,lvl,,-~. La tupla che identifica il nodo corrente nel g-nodo di livello lvl+1 in cui il messaggio si muoverà.
Line 11: Line 14:
 * ''m’.pos'' = pos.  * ''m’.pos'' = _pos.
Line 15: Line 18:
 * Prepara ''waiting_answer'' = new !WaitingAnswer(null, (lvl,pos) come PeerTupleGNode con top = lvl+1). ~-Il fatto che l'istanza di !RemoteCall è a null fa in modo che i metodi remoti che ricevono le notifiche si comportano in modo adeguato. Sostanzialmente dovrebbe cambiare solo il fatto che quando si riceve la segnalazione di get_request si risponde sempre con l'eccezione !PeersUnknownMessageError, ache se si è potuto recuperare l'istanza di !WaitingAnswer. Sull'istanza di !WaitingAnswer viene poi valorizzato il membro response con un !SerializableNone solo per indicare che il g-nodo partecipa.-~  * Prepara ''waiting_answer'' = new !WaitingAnswer(null, (lvl,_pos) come PeerTupleGNode con top = lvl+1). ~-Il fatto che l'istanza di IPeersRequest è a null fa in modo che i metodi remoti che ricevono le notifiche si comportano in modo adeguato. Sostanzialmente dovrebbe cambiare solo il fatto che quando si riceve la segnalazione di get_request si risponde sempre con l'eccezione !PeersUnknownMessageError, ache se si è potuto recuperare l'istanza di !WaitingAnswer. Sull'istanza di !WaitingAnswer viene poi valorizzato il membro response con qualcosa diverso da ''null'' solo per indicare che il g-nodo partecipa.-~
Line 17: Line 20:
 * IAddressManagerRootDispatcher ''gwstub''
 * IAddressManagerRootDispatcher? ''failed'' = null
 * IPeersManagerStub ''gwstub''
 * IPeersManagerStub? ''failed'' = null
Line 21: Line 24:
   * Calcola gwstub = map_paths.i_peers_gateway(lvl, pos, null, failed)    * Calcola gwstub = map_paths.i_peers_gateway(lvl, _pos, null, failed)
Line 25: Line 28:
   * Esegue gwstub.peers_manager.forward_peer_message(m’).
  * Se riceve l'eccezione RPCError:
   * Esegue gwstub.forward_peer_message(m’).
  * Se riceve !StubError o !DeserializeError:
Line 42: Line 45:
   * # significa che abbiamo ricevuto il contatto e che lvl,pos partecipa.    * # significa che abbiamo ricevuto il contatto e che lvl,_pos partecipa.
Line 51: Line 54:
=== Algoritmo di divulgazione della partecipazione ===
==== void publish_my_participation(p_id) ====
== Algoritmo di divulgazione della partecipazione ==
=== publish_my_participation ===
'''
void publish_my_participation(p_id)'''
Line 64: Line 68:
    * tcp_stub.peer_manager.set_participant(p_id, gn).
   * Se riceve eccezione RPCError: ignora.
    * tcp_stub.set_participant(p_id, gn).
   * Se riceve !StubError o !DeserializeError:
   * Ignora.
Line 68: Line 73:
   * br_stub.peer_manager.set_participant(p_id, gn).
  * Se riceve eccezione RPCError: ignora.
   * br_stub.set_participant(p_id, gn).
  * Se riceve !StubError o !DeserializeError:
  * Ignora.
Line 72: Line 78:
==== void set_participant(int p_id, PeerTupleGNode gn) ==== === set_participant ===
'''
void set_participant(int p_id, PeerTupleGNode gn)'''
Line 94: Line 101:
     * tcp_stub.peer_manager.set_participant(p_id, ret_gn).
    * Se riceve eccezione RPCError: ignora.
     * tcp_stub.set_participant(p_id, ret_gn).
    * Se riceve !StubError o !DeserializeError:
    * Ignora.
Line 98: Line 106:
    * br_stub.peer_manager.set_participant(p_id, ret_gn).
   * Se riceve eccezione RPCError: ignora.
    * br_stub.set_participant(p_id, ret_gn).
   * Se riceve !StubError o !DeserializeError:
   * Ignora.
Line 103: Line 112:

== Algoritmo di mantenimento di un database distribuito ==
=== begin_replica ===
'''bool begin_replica(q, p_id, x̄, r, timeout_exec, out IPeersResponse? resp, out IPeersContinuation cont)'''
 * Gli argomenti sono:
  * int ''q'': il numero delle repliche richieste,
  * int ''p_id'',
  * !PeerTupleNode ''x̄'': la tupla dell'hash della chiave del record, cioè h~-,,p,,-~ ( k ),
  * IPeersRequest ''r'': la richiesta di replicare la coppia k,val ,
  * int ''timeout_exec'',
  * ''resp'' viene valorizzato con la risposta o null;
  * ''cont'' è un oggetto di cui all'esterno si sa solo che implementa l'interfaccia vuota IPeersContinuation.
 * ''lista_repliche'' = new List di !PeerTupleNode.
 * ''exclude_tuple_list'' = new PeerTupleGNodeContainer(x̄.tuple.size).
 * cont = {q, p_id, x̄, r, timeout_exec, lista_repliche, exclude_tuple_list}.
 * Return next_replica(cont, out resp).

=== next_replica ===
'''bool next_replica(IPeersContinuation cont, out IPeersResponse? resp)'''
 * resp = null.
 * Se cont.lista_repliche.size ≥ cont.q:
  * Return False.
 * !PeerTupleNode ''respondant'';
 * ret = contact_peer(cont.p_id, cont.x̄, cont.r, cont.timeout_exec, True, out respondant, cont.exclude_tuple_list).
 * Se si riceve l'eccezione !PeersNoParticipantsInNetworkError:
  * Return False.
 * resp = ret.
 * aggiungi respondant a cont.lista_repliche.
 * aggiungi respondant a cont.exclude_tuple_list.
 * Return cont.lista_repliche.size < cont.q.

=== ttl_db_begin ===
I metodi ''ttl_db_*'' servono a gestire un database distribuito in cui i record hanno una scadenza, o TTL. Il modulo, oltre a fornire tali metodi, fornisce la classe !RequestSendKeys, la classe !RequestSendKeysResponse, l'interfaccia ITemporalDatabaseHandler.

Un presupposto, '''comune ai database temporali e ai database fixed_keys''', è che sia le chiavi sia i valori dei record siano istanze di classi derivate da ''Object'' e siano serializzabili.

La classe !RequestSendKeys è una classe serializzabile, che deriva da Object, e non ha alcun dato al suo interno. Implementa l'interfaccia (vuota) IPeersRequest. È la richiesta di inviare tutte le chiavi memorizzate.

La classe !RequestSendKeysResponse è una classe serializzabile, che deriva da Object, e contiene una lista di istanze di Object serializzabili. Implementa l'interfaccia (vuota) IPeersResponse. È la risposta alla richiesta di inviare tutte le chiavi memorizzate.

L'interfaccia ITemporalDatabaseHandler espone questi metodi:
 * ''int p_id'': una proprietà che dice l'identificativo di questo servizio.
 * ''int timeout_exec'': una proprietà che dice il tempo massimo di esecuzione per la richiesta !RequestSendKeys per questo servizio.
 * ''int msec_ttl'': una proprietà che dice il numero di millisecondi prima che un record di questo servizio scada.
 * ''bool is_valid_key(Object k)'': una funzione che dice se ''k'' è una chiave valida per il servizio.
 * ''bool key_equal_data(Object k1, Object k2)'' e ''uint key_hash_data(Object k)'': metodi le cui firme sono adatte per i delegati ''Gee.!EqualDataFunc<Object>'' e ''Gee.!HashDataFunc<Object>'', per costruire una !HashMap o una lista "con funzionalità di ricerca" di chiavi del servizio.
 * ''int max_records'': una proprietà che dice il numero massimo di record che il nodo può memorizzare per questo servizio.
 * ''int my_records_size()'': una funzione per sapere il numero di record per questo servizio attualmente memorizzati dal nodo.
 * ''bool my_records_contains(Object k)'': una funzione per sapere se il record per la chiave ''k'' per questo servizio è attualmente memorizzato dal nodo.
 * ''List<int> evaluate_hash_node(Object k, int lvl)'': una funzione che calcola la tupla ''h~-,,p,,-~(k)'' di questo servizio per la chiave ''k'' tenendo in considerazione che servono solo le posizioni interne al livello ''lvl''.

'''void ttl_db_begin(ITemporalDatabaseHandler tdh, List<int> tuple_n)'''
 * Viene avviata in una tasklet dalla classe del servizio, che deriva !PeerService.
 . Se si tratta di un servizio opzionale, viene chiamata solo dopo che sono state reperite con successo le mappe dei partecipanti ai servizi opzionali.
 . Gli argomenti del metodo ''ttl_db_begin'' sono:
  * ''tdh'': istanza di una classe che implementa ITemporalDatabaseHandler sopra descritta.
  * ''tuple_n'': una tupla con tutte le posizioni del nodo da 0 a ''levels'' - 1.
 * ''timer_non_exhaustive'' = un timer che scade dopo ''tdh.msec_ttl'' millisecondi.
 * ''removed_keys'' = new !ArrayList<Object>(''tdh.key_equal_data''): una lista vuota di chiavi cancellate su richiesta.
 * ''retrieving_keys'' = new !HashMap<Object,INtkdChannel>(''tdh.key_equal_data'', ''tdh.key_hash_data''): una mappa vuota di chiavi in corso di recupero associate ad un canale di comunicazione tra tasklet.
 * ''IPeersRequest r'' = new !RequestSendKeys().
 . '''Nota: spostare sulla descrizione di ''ttl_db_got_request''.''' Chi riceve tale richiesta risponde subito con tutte le chiavi ''k'' ∈ ''my_records.keys''.
 * Try:
  * ''!PeerTupleNode respondant'' = null. Sarà una tupla nel g-nodo di livello ''levels'', poiché ''tuple_n'' ha ''levels'' elementi.
  * Esegue ''IPeersResponse ret = contact_peer(tdh.p_id, tuple_n, r, tdh.timeout_exec, True, out respondant)''.
  . Il valore restituito dovrebbe essere un !RequestSendKeysResponse, cioè una lista di Object. Altrimenti la risposta viene ignorata.
  * Se ''ret'' è una istanza di !RequestSendKeysResponse:
   * Per ogni chiave ''k'' in ''ret'':
    * Se ''tdh.is_valid_key(k)'':
     * Se '''not''' ''my_records_contains(k)'' '''e''' ''k'' ∉ ''removed_keys'' '''e''' '''not''' ''retrieving_keys.has_key(k)'':
      * # Non sa nulla di ''k''.
      * Se dist(h~-,,p,,-~(k), n) < dist(h~-,,p,,-~(k), respondant):
       * Avvia il recupero di ''k''.
       * Attendi qualche istante per non gravare sulle prestazioni della rete.
  * Calcola ''l_n0'' = livello del massimo distinto g-nodo di ''respondant''.
  * Calcola ''p_n0'' = posizione del massimo distinto g-nodo di ''respondant''.
  * ''tuple_n'' = una tupla con tutte le posizioni del nodo da 0 a ''l_n0'' inclusi.
  * Prepara ''exclude_tuple_list'' = [] una lista di istanze di tuple globali nel g-nodo di ricerca di livello ''l_n0'' + 1.
  * Per ''i'' da 0 a ''gsize[l_n0]'' - 1: Se ''i'' ≠ ''p_n0'': Metti in ''exclude_tuple_list'' il g-nodo (''l_n0'', ''i'').
  * Metti in ''exclude_tuple_list'' il nodo ''respondant'', espresso come PeerTupleGNode di livello 0 nel g-nodo di livello ''l_n0'' + 1.
  * While ''tdh.my_records_size()'' + ''retrieving_keys.size'' < ''tdh.max_records'':
   * # la memoria destinata da ''n'' al servizio ''p'' non è esaurita.
   * Attendi qualche istante per non gravare sulle prestazioni della rete.
   * ''respondant'' = null. Sarà una tupla nel g-nodo di livello ''l_n0'' + 1, poiché ''tuple_n'' ha ''l_n0'' + 1 elementi.
   * Esegue ''ret = contact_peer(tdh.p_id, tuple_n, r, tdh.timeout_exec, True, out respondant, exclude_tuple_list)''.
   . Il valore restituito dovrebbe essere un !RequestSendKeysResponse, cioè una lista di Object. Altrimenti la risposta viene ignorata.
   * Se ''ret'' è una istanza di !RequestSendKeysResponse:
    * Per ogni chiave ''k'' in ''ret'':
     * Se ''tdh.is_valid_key(k)'':
      * Se '''not''' ''my_records_contains(k)'' '''e''' ''k'' ∉ ''removed_keys'' '''e''' '''not''' ''retrieving_keys.has_key(k)'':
       * # Non sa nulla di ''k''.
       * Se ''dist(dth.evaluate_hash_node(k, tuple_n.size), tuple_n)'' < ''dist(dth.evaluate_hash_node(k, tuple_n.size), respondant)'':
        * Esegue ''ttl_db_retrieve_record(tdh, k)''. Cioè avvia il recupero di ''k''.
        * Attendi qualche istante per non gravare sulle prestazioni della rete.
   * Metti in ''exclude_tuple_list'' il nodo ''respondant''.
 * Se riceve !PeersNoParticipantsInNetworkError:
  # L'algoritmo termina.

=== ttl_db_got_request ===
'''IPeersResponse ttl_db_got_request(ITemporalDatabaseHandler tdh, IPeersRequest r, Object k) throws !PeersRefuseExecutionError'''
 * Quando riceve una richiesta ''r'' per la chiave ''k'', la classe !PeerService del servizio ''p'' fa queste operazioni:
  * Se ''timer_non_exhaustive'' '''non''' è espirato:
   * Se ''k'' ∈ ''my_records.keys'':
    * Se ''r'' è di scrittura (o di lettura+scrittura) e prevede la cancellazione di ''k'':
     * Elabora normalmente la richiesta ''r'', rimuovendo l'elemento da ''my_records''.
     * Mette ''k'' in ''removed_keys''.
     * # L'algoritmo termina.
    * Altrimenti:
     * Elabora normalmente la richiesta ''r'', non sono previste variazioni in ''my_records.keys''.
     * # L'algoritmo termina.
   * Altrimenti-Se ''k'' ∈ ''removed_keys'':
    * Se ''r'' è di lettura o lettura+scrittura:
     * Restituisce al richiedente l'eccezione NOT_FOUND.
     * # L'algoritmo termina.
    * Altrimenti:
     * # ''r'' è di scrittura:
     * Rimuove ''k'' da ''removed_keys''.
     * Se la memoria destinata da ''n'' al servizio ''p'' non è esaurita (considerando ''my_records.size'' + ''retrieving_keys.size''):
      * Elabora normalmente la richiesta ''r'', aggiungendo un record in ''my_records[k]''.
      * # L'algoritmo termina.
     * Altrimenti:
      * Resetta il tempo di ''timer_non_exhaustive'' a TTL.
      * Rifiuta di elaborare ''r'' (perché ''out of memory'').
      * # L'algoritmo termina.
   * Altrimenti-Se ''k'' ∈ ''retrieving_keys.keys'':
    * Se ''r'' è di sola lettura:
     * Rifiuta di elaborare ''r'' (perché ''non esaustivo'').
     * # L'algoritmo termina.
    * Altrimenti:
     * # ''r'' è di scrittura o lettura+scrittura:
     * Attende di ricevere una comunicazione dal canale ''retrieving_keys[k]''.
     * Se ''r'' prevede una lettura prima della scrittura:
      * Se ''k'' ∉ ''my_records.keys'':
       * # Il nodo non ha il record ancora.
       * Mette ''k'' in ''removed_keys''.
       * Restituisce al richiedente l'eccezione NOT_FOUND.
       * # L'algoritmo termina.
      * Altrimenti:
       * Elabora normalmente la richiesta ''r''.
       * # L'algoritmo termina.
     * Altrimenti:
      * Elabora normalmente la richiesta ''r''.
      * # L'algoritmo termina.
   * Altrimenti:
    * # Non sa nulla di ''k''.
    * Se la memoria destinata da ''n'' al servizio ''p'' non è esaurita (considerando ''my_records.size'' + ''retrieving_keys.size''):
     * Mette un nuovo canale in ''retrieving_keys[k]''.
     * In una nuova tasklet:
      * Avvia il recupero di ''k''.
    * Resetta il tempo di ''timer_non_exhaustive'' a TTL.
    * Rifiuta di elaborare ''r'' (perché ''non esaustivo'').
    * # L'algoritmo termina.
  * Altrimenti:
   * # È ''esaustivo''.
   * Svuota la lista ''removed_keys''.
   * Se ''k'' ∈ ''my_records.keys'':
    * Elabora normalmente la richiesta ''r''.
    * # L'algoritmo termina.
   * Altrimenti:
    * Se ''r'' è di sola lettura o di lettura+scrittura:
     * Restituisce al richiedente l'eccezione NOT_FOUND.
     * # L'algoritmo termina.
    * Altrimenti:
     * # ''r'' è di scrittura:
     * Se la memoria destinata da ''n'' al servizio ''p'' non è esaurita (considerando ''my_records.size'' + ''retrieving_keys.size''):
      * Elabora normalmente la richiesta ''r'', aggiungendo un record in ''my_records[k]''.
      * # L'algoritmo termina.
     * Altrimenti:
      * Resetta il tempo di ''timer_non_exhaustive'' a TTL.
      * Rifiuta di elaborare ''r'' (perché ''out of memory'').
      * # L'algoritmo termina.

=== ttl_db_retrieve_record ===
'''internal void ttl_db_retrieve_record(ITemporalDatabaseHandler tdh, Object k)'''
 * Quando vuole recuperare il record per la chiave ''k'', la classe !PeerService del servizio ''p'' fa queste operazioni:
  * Prepara la richiesta ''r'' che dice "wait_then_send_record(k)".
  * '''Nota''': chi riceve una richiesta "wait_then_send_record(k)" la tratta come una normale richiesta di sola lettura per la chiave ''k'', a parte l'attesa.
  * Try:
   * Esegue ''ret = contact_peer(p_id, peer_tuple(k), r, timeout_exec, True, out respondant)''.
   * Se ''ret'' = !NotFoundError:
    * Se ''k'' ∉ ''removed_keys'':
     * Aggiunge ''k'' a ''removed_keys''.
    * Atomicamente (senza schedulare altre tasklet) invia un messaggio a tutti quelli che sono in attesa su ''retrieving_keys[k]'' e rimuove ''k'' da ''retrieving_keys''.
   * Altrimenti:
    * Scrive ''ret'' in ''my_records[k]''.
    * Rimuove ''k'' da ''removed_keys'', se c'era.
    * Atomicamente (senza schedulare altre tasklet) invia un messaggio a tutti quelli che sono in attesa su ''retrieving_keys[k]'' e rimuove ''k'' da ''retrieving_keys''.
  * Se riceve !PeersNoParticipantsInNetworkError:
   * Se ''k'' ∉ ''removed_keys'':
    * Aggiunge ''k'' a ''removed_keys''.
   * Atomicamente (senza schedulare altre tasklet) invia un messaggio a tutti quelli che sono in attesa su ''retrieving_keys[k]'' e rimuove ''k'' da ''retrieving_keys''.
  * Se riceve !PeersRefuseExecutionError:
   * Se ''k'' ∉ ''removed_keys'':
    * Aggiunge ''k'' a ''removed_keys''.
   * Atomicamente (senza schedulare altre tasklet) invia un messaggio a tutti quelli che sono in attesa su ''retrieving_keys[k]'' e rimuove ''k'' da ''retrieving_keys''.

=== fixed_keys_db_begin ===
'''void fixed_keys_db_begin(List<Object> K)'''
 * Quando il bootstrap è completo, la classe !PeerService del servizio ''p'' fa queste operazioni:
  * Crea una lista vuota di chiavi ''k'' di cui il recupero è ancora da iniziare, ''not_started_keys''.
  * Crea una mappa vuota di chiavi ''k'' in corso di recupero associate ad un canale di comunicazione tra tasklet, ''retrieving_keys''.
  * Per ogni chiave ''k'' nell'insieme completo ''K'':
   * Aggiunge ''k'' a ''not_started_keys''.
  * Per ogni chiave ''k'' nell'insieme completo ''K'':
   * Sia ''respondant'' il detentore corrente di ''k''. Lo trova con una chiamata a contact_peer in cui non chiede nulla.
   * Se dist(h~-,,p,,-~(k), n) < dist(h~-,,p,,-~(k), respondant):
    * Mette un nuovo canale in ''retrieving_keys[k]''.
    * In una nuova tasklet:
     * Avvia il recupero di ''k''.
    * Rimuove ''k'' da ''not_started_keys''.
    * Opzionalmente, a seconda di quante sono le chiavi, attendi qualche istante per non gravare sulle prestazioni della rete.
   * Altrimenti:
    * Rimuove ''k'' da ''not_started_keys''.

=== fixed_keys_db_got_request ===
'''IPeersResponse fixed_keys_db_got_request(IPeersRequest r, Object k) throws !PeersRefuseExecutionError'''
 * Quando riceve una richiesta ''r'' per la chiave ''k'', la classe !PeerService del servizio ''p'' fa queste operazioni:
  * Se k ∈ not_started_keys:
   * Rifiuta di elaborare ''r'' (perché ''non esaustivo'').
  * Se k ∈ retrieving_keys.keys:
   * Se ''r'' è di sola lettura:
    * Rifiuta di elaborare ''r'' (perché ''non esaustivo'').
    * # L'algoritmo termina.
   * Altrimenti:
    * # ''r'' è di scrittura:
    * Attende di ricevere una comunicazione dal canale ''retrieving_keys[k]''.
    * Elabora normalmente la richiesta ''r''.
    * # L'algoritmo termina.
  * Altrimenti:
   * Elabora normalmente la richiesta ''r''.
   * # L'algoritmo termina.

=== fixed_keys_db_retrieve_record ===
'''internal void fixed_keys_db_retrieve_record(Object k)'''
 * Quando vuole recuperare il record per la chiave ''k'', la classe !PeerService del servizio ''p'' fa queste operazioni:
  * Prepara la richiesta ''r'' che dice "wait_then_send_record(k)".
  * '''Nota''': chi riceve una richiesta "wait_then_send_record(k)" la tratta come una normale richiesta di sola lettura per la chiave ''k'', a parte l'attesa.
  * Try:
   * Esegue ''ret = contact_peer(p_id, peer_tuple(k), r, timeout_exec, True, out respondant)''.
   * Scrive ''ret'' in ''my_records[k]''.
  * Se riceve !PeersNoParticipantsInNetworkError oppure !PeersRefuseExecutionError:
   * Valorizza ''my_records[k]'' con un appropriato default.
  * Atomicamente (senza schedulare altre tasklet) invia un messaggio a tutti quelli che sono in attesa su ''retrieving_keys[k]'' e rimuove ''k'' da ''retrieving_keys''.

Modulo PeerServices - Appunti - Algoritmi 2 / 2

Algoritmo di rilevamento di non partecipazione

check_non_participation

bool check_non_participation(p_id, lvl, _pos)

  • In questo algoritmo non ci interessa sapere se un g-nodo partecipa, ma solo se è possibile dire con certezza che esso non partecipa. In caso di incertezza l'algoritmo restituisce False.
  • Produci = la tupla x̄0·x̄1·...·x̄lvl-1 dove x̄i = 0 per ogni i. La tupla identifica un indirizzo a caso all'interno del g-nodo g = (lvl, _pos). Se lvl = 0 allora x̄ è null.

  • Produci n = make_tuple_node(new HCoord(0, pos[0]), lvl+1) , cioè la tupla n0·n1·...·nlvl. La tupla che identifica il nodo corrente nel g-nodo di livello lvl+1 in cui il messaggio si muoverà.

  • m’ = new PeerMessageForwarder.

  • m’.n = n.

  • m’.x̄ = x̄.

  • m’.lvl = lvl.

  • m’.pos = _pos.

  • m’.p_id = p_id.

  • m’.msg_id = un identificativo generato a caso per questo messaggio.

  • Calcola timeout_instradamento = f ( map_paths.i_peers_get_nodes_in_my_group(lvl + 1) ).

  • Prepara waiting_answer = new WaitingAnswer(null, (lvl,_pos) come PeerTupleGNode con top = lvl+1). Il fatto che l'istanza di IPeersRequest è a null fa in modo che i metodi remoti che ricevono le notifiche si comportano in modo adeguato. Sostanzialmente dovrebbe cambiare solo il fatto che quando si riceve la segnalazione di get_request si risponde sempre con l'eccezione PeersUnknownMessageError, ache se si è potuto recuperare l'istanza di WaitingAnswer. Sull'istanza di WaitingAnswer viene poi valorizzato il membro response con qualcosa diverso da null solo per indicare che il g-nodo partecipa.

  • waiting_answer_map[m’.msg_id] = waiting_answer.

  • IPeersManagerStub gwstub

  • IPeersManagerStub? failed = null

  • While True:
    • Try:
      • Calcola gwstub = map_paths.i_peers_gateway(lvl, _pos, null, failed)
    • Se riceve l'eccezione PeersNonexistentDestinationError:

      • Restituisci True. Rimuovi waiting_answer_map[m’.msg_id]. Termina algoritmo.
    • Try:
      • Esegue gwstub.forward_peer_message(m’).
    • Se riceve StubError o DeserializeError:

      • failed = gwstub.
      • Continua con la prossima iterazione del ciclo.
    • Esci dal ciclo.
  • Try:
    • Sta in attesa su waiting_answer.ch per max timeout_instradamento.
    • Se waiting_answer.exclude_gnode ≠ null:
      • Restituisce False. Rimuovi waiting_answer_map[m’.msg_id]. Termina algoritmo.
    • Altrimenti-Se waiting_answer.non_participant_gnode ≠ null:
      • # significa che abbiamo ricevuto notizia di un gnodo non partecipante.
      • waiting_answer.non_participant_gnode è un PeerTupleGNode che rappresenta un g-nodo h dentro il mio g-nodo di livello top.

      • Se è visibile nella mia mappa, cioè se (lvl,top) non partecipa:
        • Restituisci True. Rimuovi waiting_answer_map[m’.msg_id]. Termina algoritmo.
      • Altrimenti:
        • Restituisce False. Rimuovi waiting_answer_map[m’.msg_id]. Termina algoritmo.
    • Altrimenti-Se waiting_answer.response ≠ null:
      • # significa che abbiamo ricevuto il contatto e che lvl,_pos partecipa.
      • Restituisce False. Rimuovi waiting_answer_map[m’.msg_id]. Termina algoritmo.
    • Altrimenti:
      • # significa che abbiamo ricevuto un nuovo valore in waiting_answer.min_target.
      • Restituisce False. Rimuovi waiting_answer_map[m’.msg_id]. Termina algoritmo.
  • Se riceve l'eccezione TimeoutError:

    • # dobbiamo trattare waiting_answer.min_target come da escludere.
    • Restituisce False. Rimuovi waiting_answer_map[m’.msg_id]. Termina algoritmo.

Algoritmo di divulgazione della partecipazione

publish_my_participation

void publish_my_participation(p_id)

  • gn = make_tuple_gnode(new HCoord(0, pos[0]), levels). La tupla n0·n1·...·nlevels-1, che identifica il nodo corrente nella rete.

  • tempo_attesa = 300 secondi.

  • iterazioni = 5.

  • While True (per sempre):
    • Se iterazioni > 0:

      • Decrementa iterazioni di 1.
    • Altrimenti:
      • tempo_attesa = 1 giorno + random(1..24*60*60) secondi.
    • Prepara un IPeersMissingArcHandler missing_handler che in caso di invocazione esegua:

      • Calcola tcp_stub = neighbors_factory.i_peers_get_tcp(missing_arc).
      • Try:
        • tcp_stub.set_participant(p_id, gn).
      • Se riceve StubError o DeserializeError:

        • Ignora.
    • Calcola br_stub = neighbors_factory.i_peers_get_broadcast(missing_handler).
    • Try:
      • br_stub.set_participant(p_id, gn).
    • Se riceve StubError o DeserializeError:

      • Ignora.
    • Aspetta tempo_attesa.

set_participant

void set_participant(int p_id, PeerTupleGNode gn)

  • E' già stata istanziata lista_recenti un ArrayList di HCoord.

  • Se services.has_key(p_id) AND NOT services[p_id].p_is_optional:
    • Ignora il messaggio. Algoritmo termina.
  • int case, HCoord ret.

  • Calcola convert_tuple_gnode(gn, out case, out ret).
  • Se case = 1:
    • Cioè gn rappresenta un mio g-nodo.
    • Ignora il messaggio. Algoritmo termina.
  • Altrimenti:
    • Cioè gn rappresenta un g-nodo a cui io non appartengo, ed ho già calcolato in ret il g-nodo visibile nella mia topologia in cui gn si trova.
    • Se ret ∈ lista_recenti:
      • Ignora il messaggio. Algoritmo termina.
    • Altrimenti:
      • lista_recenti.add(ret).
      • Se NOT participant_maps.has_key(p_id):
        • participant_maps[p_id] = new ParticipantMap().

      • participant_maps[p_id].participant_list.add(ret).
      • ret_gn = make_tuple_gnode(ret, levels)

      • Prepara un IPeersMissingArcHandler missing_handler che in caso di invocazione esegua:

        • Calcola tcp_stub = neighbors_factory.i_peers_get_tcp(missing_arc).
        • Try:
          • tcp_stub.set_participant(p_id, ret_gn).
        • Se riceve StubError o DeserializeError:

          • Ignora.
      • Calcola br_stub = neighbors_factory.i_peers_get_broadcast(missing_handler).
      • Try:
        • br_stub.set_participant(p_id, ret_gn).
      • Se riceve StubError o DeserializeError:

        • Ignora.
      • Svolgi quanto segue in una nuova tasklet portando dietro ret:
        • Aspetta 60 secondi.
        • lista_recenti.remove(ret).

Algoritmo di mantenimento di un database distribuito

begin_replica

bool begin_replica(q, p_id, x̄, r, timeout_exec, out IPeersResponse? resp, out IPeersContinuation cont)

  • Gli argomenti sono:
    • int q: il numero delle repliche richieste,

    • int p_id,

    • PeerTupleNode : la tupla dell'hash della chiave del record, cioè hp ( k ),

    • IPeersRequest r: la richiesta di replicare la coppia k,val ,

    • int timeout_exec,

    • resp viene valorizzato con la risposta o null;

    • cont è un oggetto di cui all'esterno si sa solo che implementa l'interfaccia vuota IPeersContinuation.

  • lista_repliche = new List di PeerTupleNode.

  • exclude_tuple_list = new PeerTupleGNodeContainer(x̄.tuple.size).

  • cont = {q, p_id, x̄, r, timeout_exec, lista_repliche, exclude_tuple_list}.
  • Return next_replica(cont, out resp).

next_replica

bool next_replica(IPeersContinuation cont, out IPeersResponse? resp)

  • resp = null.
  • Se cont.lista_repliche.size ≥ cont.q:
    • Return False.
  • PeerTupleNode respondant;

  • ret = contact_peer(cont.p_id, cont.x̄, cont.r, cont.timeout_exec, True, out respondant, cont.exclude_tuple_list).
  • Se si riceve l'eccezione PeersNoParticipantsInNetworkError:

    • Return False.
  • resp = ret.
  • aggiungi respondant a cont.lista_repliche.
  • aggiungi respondant a cont.exclude_tuple_list.
  • Return cont.lista_repliche.size < cont.q.

ttl_db_begin

I metodi ttl_db_* servono a gestire un database distribuito in cui i record hanno una scadenza, o TTL. Il modulo, oltre a fornire tali metodi, fornisce la classe RequestSendKeys, la classe RequestSendKeysResponse, l'interfaccia ITemporalDatabaseHandler.

Un presupposto, comune ai database temporali e ai database fixed_keys, è che sia le chiavi sia i valori dei record siano istanze di classi derivate da Object e siano serializzabili.

La classe RequestSendKeys è una classe serializzabile, che deriva da Object, e non ha alcun dato al suo interno. Implementa l'interfaccia (vuota) IPeersRequest. È la richiesta di inviare tutte le chiavi memorizzate.

La classe RequestSendKeysResponse è una classe serializzabile, che deriva da Object, e contiene una lista di istanze di Object serializzabili. Implementa l'interfaccia (vuota) IPeersResponse. È la risposta alla richiesta di inviare tutte le chiavi memorizzate.

L'interfaccia ITemporalDatabaseHandler espone questi metodi:

  • int p_id: una proprietà che dice l'identificativo di questo servizio.

  • int timeout_exec: una proprietà che dice il tempo massimo di esecuzione per la richiesta RequestSendKeys per questo servizio.

  • int msec_ttl: una proprietà che dice il numero di millisecondi prima che un record di questo servizio scada.

  • bool is_valid_key(Object k): una funzione che dice se k è una chiave valida per il servizio.

  • bool key_equal_data(Object k1, Object k2) e uint key_hash_data(Object k): metodi le cui firme sono adatte per i delegati Gee.EqualDataFunc<Object> e Gee.HashDataFunc<Object>, per costruire una HashMap o una lista "con funzionalità di ricerca" di chiavi del servizio.

  • int max_records: una proprietà che dice il numero massimo di record che il nodo può memorizzare per questo servizio.

  • int my_records_size(): una funzione per sapere il numero di record per questo servizio attualmente memorizzati dal nodo.

  • bool my_records_contains(Object k): una funzione per sapere se il record per la chiave k per questo servizio è attualmente memorizzato dal nodo.

  • List<int> evaluate_hash_node(Object k, int lvl): una funzione che calcola la tupla hp(k) di questo servizio per la chiave k tenendo in considerazione che servono solo le posizioni interne al livello lvl.

void ttl_db_begin(ITemporalDatabaseHandler tdh, List<int> tuple_n)

  • Viene avviata in una tasklet dalla classe del servizio, che deriva PeerService.

  • Se si tratta di un servizio opzionale, viene chiamata solo dopo che sono state reperite con successo le mappe dei partecipanti ai servizi opzionali.
  • Gli argomenti del metodo ttl_db_begin sono:

    • tdh: istanza di una classe che implementa ITemporalDatabaseHandler sopra descritta.

    • tuple_n: una tupla con tutte le posizioni del nodo da 0 a levels - 1.

  • timer_non_exhaustive = un timer che scade dopo tdh.msec_ttl millisecondi.

  • removed_keys = new ArrayList<Object>(tdh.key_equal_data): una lista vuota di chiavi cancellate su richiesta.

  • retrieving_keys = new HashMap<Object,INtkdChannel>(tdh.key_equal_data, tdh.key_hash_data): una mappa vuota di chiavi in corso di recupero associate ad un canale di comunicazione tra tasklet.

  • IPeersRequest r = new RequestSendKeys().

  • Nota: spostare sulla descrizione di ttl_db_got_request. Chi riceve tale richiesta risponde subito con tutte le chiavi kmy_records.keys.

  • Try:
    • PeerTupleNode respondant = null. Sarà una tupla nel g-nodo di livello levels, poiché tuple_n ha levels elementi.

    • Esegue IPeersResponse ret = contact_peer(tdh.p_id, tuple_n, r, tdh.timeout_exec, True, out respondant).

    • Il valore restituito dovrebbe essere un RequestSendKeysResponse, cioè una lista di Object. Altrimenti la risposta viene ignorata.

    • Se ret è una istanza di RequestSendKeysResponse:

      • Per ogni chiave k in ret:

        • Se tdh.is_valid_key(k):

          • Se not my_records_contains(k) e kremoved_keys e not retrieving_keys.has_key(k):

            • # Non sa nulla di k.

            • Se dist(hp(k), n) < dist(hp(k), respondant):

              • Avvia il recupero di k.

              • Attendi qualche istante per non gravare sulle prestazioni della rete.
    • Calcola l_n0 = livello del massimo distinto g-nodo di respondant.

    • Calcola p_n0 = posizione del massimo distinto g-nodo di respondant.

    • tuple_n = una tupla con tutte le posizioni del nodo da 0 a l_n0 inclusi.

    • Prepara exclude_tuple_list = [] una lista di istanze di tuple globali nel g-nodo di ricerca di livello l_n0 + 1.

    • Per i da 0 a gsize[l_n0] - 1: Se ip_n0: Metti in exclude_tuple_list il g-nodo (l_n0, i).

    • Metti in exclude_tuple_list il nodo respondant, espresso come PeerTupleGNode di livello 0 nel g-nodo di livello l_n0 + 1.

    • While tdh.my_records_size() + retrieving_keys.size < tdh.max_records:

      • # la memoria destinata da n al servizio p non è esaurita.

      • Attendi qualche istante per non gravare sulle prestazioni della rete.
      • respondant = null. Sarà una tupla nel g-nodo di livello l_n0 + 1, poiché tuple_n ha l_n0 + 1 elementi.

      • Esegue ret = contact_peer(tdh.p_id, tuple_n, r, tdh.timeout_exec, True, out respondant, exclude_tuple_list).

      • Il valore restituito dovrebbe essere un RequestSendKeysResponse, cioè una lista di Object. Altrimenti la risposta viene ignorata.

      • Se ret è una istanza di RequestSendKeysResponse:

        • Per ogni chiave k in ret:

          • Se tdh.is_valid_key(k):

            • Se not my_records_contains(k) e kremoved_keys e not retrieving_keys.has_key(k):

              • # Non sa nulla di k.

              • Se dist(dth.evaluate_hash_node(k, tuple_n.size), tuple_n) < dist(dth.evaluate_hash_node(k, tuple_n.size), respondant):

                • Esegue ttl_db_retrieve_record(tdh, k). Cioè avvia il recupero di k.

                • Attendi qualche istante per non gravare sulle prestazioni della rete.
      • Metti in exclude_tuple_list il nodo respondant.

  • Se riceve PeersNoParticipantsInNetworkError:

    • # L'algoritmo termina.

ttl_db_got_request

IPeersResponse ttl_db_got_request(ITemporalDatabaseHandler tdh, IPeersRequest r, Object k) throws PeersRefuseExecutionError

  • Quando riceve una richiesta r per la chiave k, la classe PeerService del servizio p fa queste operazioni:

    • Se timer_non_exhaustive non è espirato:

      • Se kmy_records.keys:

        • Se r è di scrittura (o di lettura+scrittura) e prevede la cancellazione di k:

          • Elabora normalmente la richiesta r, rimuovendo l'elemento da my_records.

          • Mette k in removed_keys.

          • # L'algoritmo termina.
        • Altrimenti:
          • Elabora normalmente la richiesta r, non sono previste variazioni in my_records.keys.

          • # L'algoritmo termina.
      • Altrimenti-Se kremoved_keys:

        • Se r è di lettura o lettura+scrittura:

          • Restituisce al richiedente l'eccezione NOT_FOUND.
          • # L'algoritmo termina.
        • Altrimenti:
          • # r è di scrittura:

          • Rimuove k da removed_keys.

          • Se la memoria destinata da n al servizio p non è esaurita (considerando my_records.size + retrieving_keys.size):

            • Elabora normalmente la richiesta r, aggiungendo un record in my_records[k].

            • # L'algoritmo termina.
          • Altrimenti:
            • Resetta il tempo di timer_non_exhaustive a TTL.

            • Rifiuta di elaborare r (perché out of memory).

            • # L'algoritmo termina.
      • Altrimenti-Se kretrieving_keys.keys:

        • Se r è di sola lettura:

          • Rifiuta di elaborare r (perché non esaustivo).

          • # L'algoritmo termina.
        • Altrimenti:
          • # r è di scrittura o lettura+scrittura:

          • Attende di ricevere una comunicazione dal canale retrieving_keys[k].

          • Se r prevede una lettura prima della scrittura:

            • Se kmy_records.keys:

              • # Il nodo non ha il record ancora.
              • Mette k in removed_keys.

              • Restituisce al richiedente l'eccezione NOT_FOUND.
              • # L'algoritmo termina.
            • Altrimenti:
              • Elabora normalmente la richiesta r.

              • # L'algoritmo termina.
          • Altrimenti:
            • Elabora normalmente la richiesta r.

            • # L'algoritmo termina.
      • Altrimenti:
        • # Non sa nulla di k.

        • Se la memoria destinata da n al servizio p non è esaurita (considerando my_records.size + retrieving_keys.size):

          • Mette un nuovo canale in retrieving_keys[k].

          • In una nuova tasklet:
            • Avvia il recupero di k.

        • Resetta il tempo di timer_non_exhaustive a TTL.

        • Rifiuta di elaborare r (perché non esaustivo).

        • # L'algoritmo termina.
    • Altrimenti:
      • # È esaustivo.

      • Svuota la lista removed_keys.

      • Se kmy_records.keys:

        • Elabora normalmente la richiesta r.

        • # L'algoritmo termina.
      • Altrimenti:
        • Se r è di sola lettura o di lettura+scrittura:

          • Restituisce al richiedente l'eccezione NOT_FOUND.
          • # L'algoritmo termina.
        • Altrimenti:
          • # r è di scrittura:

          • Se la memoria destinata da n al servizio p non è esaurita (considerando my_records.size + retrieving_keys.size):

            • Elabora normalmente la richiesta r, aggiungendo un record in my_records[k].

            • # L'algoritmo termina.
          • Altrimenti:
            • Resetta il tempo di timer_non_exhaustive a TTL.

            • Rifiuta di elaborare r (perché out of memory).

            • # L'algoritmo termina.

ttl_db_retrieve_record

internal void ttl_db_retrieve_record(ITemporalDatabaseHandler tdh, Object k)

  • Quando vuole recuperare il record per la chiave k, la classe PeerService del servizio p fa queste operazioni:

    • Prepara la richiesta r che dice "wait_then_send_record(k)".

    • Nota: chi riceve una richiesta "wait_then_send_record(k)" la tratta come una normale richiesta di sola lettura per la chiave k, a parte l'attesa.

    • Try:
      • Esegue ret = contact_peer(p_id, peer_tuple(k), r, timeout_exec, True, out respondant).

      • Se ret = NotFoundError:

        • Se kremoved_keys:

          • Aggiunge k a removed_keys.

        • Atomicamente (senza schedulare altre tasklet) invia un messaggio a tutti quelli che sono in attesa su retrieving_keys[k] e rimuove k da retrieving_keys.

      • Altrimenti:
        • Scrive ret in my_records[k].

        • Rimuove k da removed_keys, se c'era.

        • Atomicamente (senza schedulare altre tasklet) invia un messaggio a tutti quelli che sono in attesa su retrieving_keys[k] e rimuove k da retrieving_keys.

    • Se riceve PeersNoParticipantsInNetworkError:

      • Se kremoved_keys:

        • Aggiunge k a removed_keys.

      • Atomicamente (senza schedulare altre tasklet) invia un messaggio a tutti quelli che sono in attesa su retrieving_keys[k] e rimuove k da retrieving_keys.

    • Se riceve PeersRefuseExecutionError:

      • Se kremoved_keys:

        • Aggiunge k a removed_keys.

      • Atomicamente (senza schedulare altre tasklet) invia un messaggio a tutti quelli che sono in attesa su retrieving_keys[k] e rimuove k da retrieving_keys.

fixed_keys_db_begin

void fixed_keys_db_begin(List<Object> K)

  • Quando il bootstrap è completo, la classe PeerService del servizio p fa queste operazioni:

    • Crea una lista vuota di chiavi k di cui il recupero è ancora da iniziare, not_started_keys.

    • Crea una mappa vuota di chiavi k in corso di recupero associate ad un canale di comunicazione tra tasklet, retrieving_keys.

    • Per ogni chiave k nell'insieme completo K:

      • Aggiunge k a not_started_keys.

    • Per ogni chiave k nell'insieme completo K:

      • Sia respondant il detentore corrente di k. Lo trova con una chiamata a contact_peer in cui non chiede nulla.

      • Se dist(hp(k), n) < dist(hp(k), respondant):

        • Mette un nuovo canale in retrieving_keys[k].

        • In una nuova tasklet:
          • Avvia il recupero di k.

        • Rimuove k da not_started_keys.

        • Opzionalmente, a seconda di quante sono le chiavi, attendi qualche istante per non gravare sulle prestazioni della rete.
      • Altrimenti:
        • Rimuove k da not_started_keys.

fixed_keys_db_got_request

IPeersResponse fixed_keys_db_got_request(IPeersRequest r, Object k) throws PeersRefuseExecutionError

  • Quando riceve una richiesta r per la chiave k, la classe PeerService del servizio p fa queste operazioni:

    • Se k ∈ not_started_keys:
      • Rifiuta di elaborare r (perché non esaustivo).

    • Se k ∈ retrieving_keys.keys:
      • Se r è di sola lettura:

        • Rifiuta di elaborare r (perché non esaustivo).

        • # L'algoritmo termina.
      • Altrimenti:
        • # r è di scrittura:

        • Attende di ricevere una comunicazione dal canale retrieving_keys[k].

        • Elabora normalmente la richiesta r.

        • # L'algoritmo termina.
    • Altrimenti:
      • Elabora normalmente la richiesta r.

      • # L'algoritmo termina.

fixed_keys_db_retrieve_record

internal void fixed_keys_db_retrieve_record(Object k)

  • Quando vuole recuperare il record per la chiave k, la classe PeerService del servizio p fa queste operazioni:

    • Prepara la richiesta r che dice "wait_then_send_record(k)".

    • Nota: chi riceve una richiesta "wait_then_send_record(k)" la tratta come una normale richiesta di sola lettura per la chiave k, a parte l'attesa.

    • Try:
      • Esegue ret = contact_peer(p_id, peer_tuple(k), r, timeout_exec, True, out respondant).

      • Scrive ret in my_records[k].

    • Se riceve PeersNoParticipantsInNetworkError oppure PeersRefuseExecutionError:

      • Valorizza my_records[k] con un appropriato default.

    • Atomicamente (senza schedulare altre tasklet) invia un messaggio a tutti quelli che sono in attesa su retrieving_keys[k] e rimuove k da retrieving_keys.

Netsukuku/ita/docs/ModuloPeers/AppuntiAlgo2 (last edited 2015-11-28 11:14:43 by lukisi)