| Size: 6249 Comment:  | Size: 21853 Comment:  | 
| Deletions are marked like this. | Additions are marked like this. | 
| Line 1: | Line 1: | 
| = Modulo PeerServices - Appunti - Algoritmi principali = === Algoritmo di rilevamento non partecipazione === ==== bool check_non_participation(p_id, lvl, pos) ==== * In questo algoritmo non ci interessa sapere se un g-nodo partecipa, ma solo se è possibile dire con certezza che esso non partecipa. Se è incerto restituisce False, esattamente come farebbe se potesse dire con certezza che partecipa. * Produci ''x̄'' = la tupla x̄~-,,0,,-~·x̄~-,,1,,-~·...·x̄~-,,lvl-1,,-~ dove x̄~-,,i,,-~ = 0 per ogni i. La tupla identifica un indirizzo a caso all'interno del g-nodo ''g'' = (lvl, pos). Se lvl = 0 allora x̄ è null. * Produci ''n'' = la tupla n~-,,0,,-~·n~-,,1,,-~·...·n~-,,lvl,,-~. La tupla che identifica il nodo corrente nel g-nodo di livello lvl+1 in cui il messaggio si muoverà. | = Modulo PeerServices - Appunti - Algoritmi 2 / 2 = <<TableOfContents(4)>> == Algoritmo di rilevamento di non partecipazione == === check_non_participation === '''bool check_non_participation(p_id, lvl, _pos)''' * In questo algoritmo non ci interessa sapere se un g-nodo partecipa, ma solo se è possibile dire con certezza che esso non partecipa. In caso di incertezza l'algoritmo restituisce False. * Produci ''x̄'' = la tupla x̄~-,,0,,-~·x̄~-,,1,,-~·...·x̄~-,,lvl-1,,-~ dove x̄~-,,i,,-~ = 0 per ogni i. La tupla identifica un indirizzo a caso all'interno del g-nodo ''g'' = (lvl, _pos). Se lvl = 0 allora x̄ è null. * Produci ''n'' = make_tuple_node(new HCoord(0, pos[0]), lvl+1) , cioè la tupla n~-,,0,,-~·n~-,,1,,-~·...·n~-,,lvl,,-~. La tupla che identifica il nodo corrente nel g-nodo di livello lvl+1 in cui il messaggio si muoverà. | 
| Line 11: | Line 14: | 
| * ''m’.pos'' = pos. | * ''m’.pos'' = _pos. | 
| Line 15: | Line 18: | 
| * Prepara ''waiting_answer'' = new !WaitingAnswer(null, (lvl,pos) come PeerTupleGNode con top = lvl+1). ~-Il fatto che l'istanza di !RemoteCall è a null fa in modo che i metodi remoti che ricevono le notifiche si comportano in modo adeguato. Sostanzialmente dovrebbe cambiare solo il fatto che quando si riceve la segnalazione di get_request si risponde sempre con l'eccezione !PeersUnknownMessageError, ache se si è potuto recuperare l'istanza di !WaitingAnswer. Sull'istanza di !WaitingAnswer viene poi valorizzato il membro response con un !SerializableNone solo per indicare che il g-nodo partecipa.-~ | * Prepara ''waiting_answer'' = new !WaitingAnswer(null, (lvl,_pos) come PeerTupleGNode con top = lvl+1). ~-Il fatto che l'istanza di IPeersRequest è a null fa in modo che i metodi remoti che ricevono le notifiche si comportano in modo adeguato. Sostanzialmente dovrebbe cambiare solo il fatto che quando si riceve la segnalazione di get_request si risponde sempre con l'eccezione !PeersUnknownMessageError, ache se si è potuto recuperare l'istanza di !WaitingAnswer. Sull'istanza di !WaitingAnswer viene poi valorizzato il membro response con qualcosa diverso da ''null'' solo per indicare che il g-nodo partecipa.-~ | 
| Line 17: | Line 20: | 
| * IAddressManagerRootDispatcher ''gwstub'' * IAddressManagerRootDispatcher? ''failed'' = null | * IPeersManagerStub ''gwstub'' * IPeersManagerStub? ''failed'' = null | 
| Line 21: | Line 24: | 
| * Calcola gwstub = map_paths.i_peers_gateway(lvl, pos, null, failed) | * Calcola gwstub = map_paths.i_peers_gateway(lvl, _pos, null, failed) | 
| Line 25: | Line 28: | 
| * Esegue gwstub.peers_manager.forward_peer_message(m’). * Se riceve l'eccezione RPCError: | * Esegue gwstub.forward_peer_message(m’). * Se riceve !StubError o !DeserializeError: | 
| Line 42: | Line 45: | 
| * # significa che abbiamo ricevuto il contatto e che lvl,pos partecipa. | * # significa che abbiamo ricevuto il contatto e che lvl,_pos partecipa. | 
| Line 51: | Line 54: | 
| === Algoritmo di divulgazione della partecipazione === ==== void publish_my_participation(p_id) ==== | == Algoritmo di divulgazione della partecipazione == === publish_my_participation === '''void publish_my_participation(p_id)''' | 
| Line 64: | Line 68: | 
| * tcp_stub.peer_manager.set_participant(p_id, gn). * Se riceve eccezione RPCError: ignora. | * tcp_stub.set_participant(p_id, gn). * Se riceve !StubError o !DeserializeError: * Ignora. | 
| Line 68: | Line 73: | 
| * br_stub.peer_manager.set_participant(p_id, gn). * Se riceve eccezione RPCError: ignora. | * br_stub.set_participant(p_id, gn). * Se riceve !StubError o !DeserializeError: * Ignora. | 
| Line 72: | Line 78: | 
| ==== void set_participant(int p_id, PeerTupleGNode gn) ==== | === set_participant === '''void set_participant(int p_id, PeerTupleGNode gn)''' | 
| Line 83: | Line 90: | 
| * Se lista_recenti.contains(ret): | * Se ret ∈ lista_recenti: | 
| Line 94: | Line 101: | 
| * tcp_stub.peer_manager.set_participant(p_id, ret_gn). * Se riceve eccezione RPCError: ignora. | * tcp_stub.set_participant(p_id, ret_gn). * Se riceve !StubError o !DeserializeError: * Ignora. | 
| Line 98: | Line 106: | 
| * br_stub.peer_manager.set_participant(p_id, ret_gn). * Se riceve eccezione RPCError: ignora. | * br_stub.set_participant(p_id, ret_gn). * Se riceve !StubError o !DeserializeError: * Ignora. | 
| Line 103: | Line 112: | 
| == Algoritmo di mantenimento di un database distribuito == === begin_replica === '''bool begin_replica(q, p_id, x̄, r, timeout_exec, out IPeersResponse? resp, out IPeersContinuation cont)''' * Gli argomenti sono: * int ''q'': il numero delle repliche richieste, * int ''p_id'', * !PeerTupleNode ''x̄'': la tupla dell'hash della chiave del record, cioè h~-,,p,,-~ ( k ), * IPeersRequest ''r'': la richiesta di replicare la coppia k,val , * int ''timeout_exec'', * ''resp'' viene valorizzato con la risposta o null; * ''cont'' è un oggetto di cui all'esterno si sa solo che implementa l'interfaccia vuota IPeersContinuation. * ''lista_repliche'' = new List di !PeerTupleNode. * ''exclude_tuple_list'' = new PeerTupleGNodeContainer(x̄.tuple.size). * cont = {q, p_id, x̄, r, timeout_exec, lista_repliche, exclude_tuple_list}. * Return next_replica(cont, out resp). === next_replica === '''bool next_replica(IPeersContinuation cont, out IPeersResponse? resp)''' * resp = null. * Se cont.lista_repliche.size ≥ cont.q: * Return False. * !PeerTupleNode ''respondant''; * ret = contact_peer(cont.p_id, cont.x̄, cont.r, cont.timeout_exec, True, out respondant, cont.exclude_tuple_list). * Se si riceve l'eccezione !PeersNoParticipantsInNetworkError: * Return False. * resp = ret. * aggiungi respondant a cont.lista_repliche. * aggiungi respondant a cont.exclude_tuple_list. * Return cont.lista_repliche.size < cont.q. === ttl_db_begin === I metodi ''ttl_db_*'' servono a gestire un database distribuito in cui i record hanno una scadenza, o TTL. Il modulo, oltre a fornire tali metodi, fornisce la classe !RequestSendKeys, la classe !RequestSendKeysResponse, l'interfaccia ITemporalDatabaseHandler. Un presupposto, '''comune ai database temporali e ai database fixed_keys''', è che sia le chiavi sia i valori dei record siano istanze di classi derivate da ''Object'' e siano serializzabili. La classe !RequestSendKeys è una classe serializzabile, che deriva da Object, e non ha alcun dato al suo interno. Implementa l'interfaccia (vuota) IPeersRequest. È la richiesta di inviare tutte le chiavi memorizzate. La classe !RequestSendKeysResponse è una classe serializzabile, che deriva da Object, e contiene una lista di istanze di Object serializzabili. Implementa l'interfaccia (vuota) IPeersResponse. È la risposta alla richiesta di inviare tutte le chiavi memorizzate. L'interfaccia ITemporalDatabaseHandler espone questi metodi: * ''int p_id'': una proprietà che dice l'identificativo di questo servizio. * ''int timeout_exec'': una proprietà che dice il tempo massimo di esecuzione per la richiesta !RequestSendKeys per questo servizio. * ''int msec_ttl'': una proprietà che dice il numero di millisecondi prima che un record di questo servizio scada. * ''bool is_valid_key(Object k)'': una funzione che dice se ''k'' è una chiave valida per il servizio. * ''bool key_equal_data(Object k1, Object k2)'' e ''uint key_hash_data(Object k)'': metodi le cui firme sono adatte per i delegati ''Gee.!EqualDataFunc<Object>'' e ''Gee.!HashDataFunc<Object>'', per costruire una !HashMap o una lista "con funzionalità di ricerca" di chiavi del servizio. * ''int max_records'': una proprietà che dice il numero massimo di record che il nodo può memorizzare per questo servizio. * ''int my_records_size()'': una funzione per sapere il numero di record per questo servizio attualmente memorizzati dal nodo. * ''bool my_records_contains(Object k)'': una funzione per sapere se il record per la chiave ''k'' per questo servizio è attualmente memorizzato dal nodo. '''void ttl_db_begin(ITemporalDatabaseHandler tdh, List<int> tuple_n)''' * Viene avviata in una tasklet dalla classe del servizio, che deriva !PeerService. . Se si tratta di un servizio opzionale, viene chiamata solo dopo che sono state reperite con successo le mappe dei partecipanti ai servizi opzionali. . Gli argomenti del metodo ''ttl_db_begin'' sono: * ''tdh'': istanza di una classe che implementa ITemporalDatabaseHandler sopra descritta. * ''tuple_n'': una tupla con tutte le posizioni del nodo da 0 a ''levels'' - 1. * ''timer_non_exhaustive'' = un timer che scade dopo ''tdh.msec_ttl'' millisecondi. * ''removed_keys'' = new !ArrayList<Object>(''tdh.key_equal_data''): una lista vuota di chiavi cancellate su richiesta. * ''retrieving_keys'' = new !HashMap<Object,INtkdChannel>(''tdh.key_equal_data'', ''tdh.key_hash_data''): una mappa vuota di chiavi in corso di recupero associate ad un canale di comunicazione tra tasklet. * ''IPeersRequest r'' = new !RequestSendKeys(). . '''Nota: spostare sulla descrizione di ''ttl_db_got_request''.''' Chi riceve tale richiesta risponde subito con tutte le chiavi ''k'' ∈ ''my_records.keys''. * Try: * ''respondant'' = null. * Esegue ''IPeersResponse ret = contact_peer(tdh.p_id, tuple_n, r, tdh.timeout_exec, True, out respondant)''. . Il valore restituito dovrebbe essere un !RequestSendKeysResponse, cioè una lista di Object. Altrimenti la risposta viene ignorata. * Se ''ret'' è una istanza di !RequestSendKeysResponse: * Per ogni chiave ''k'' in ''ret'': * Se ''tdh.is_valid_key(k)'': * Se '''not''' ''my_records_contains(k)'' '''e''' ''k'' ∉ ''removed_keys'' '''e''' '''not''' ''retrieving_keys.has_key(k)'': * # Non sa nulla di ''k''. * Se dist(h~-,,p,,-~(k), n) < dist(h~-,,p,,-~(k), respondant): * Avvia il recupero di ''k''. * Attendi qualche istante per non gravare sulle prestazioni della rete. * Calcola ''l_n0'' = livello del massimo distinto g-nodo di ''respondant''. * Calcola ''p_n0'' = posizione del massimo distinto g-nodo di ''respondant''. * ''tuple_n'' = una tupla con tutte le posizioni del nodo da 0 a ''l_n0'' inclusi. * Prepara ''exclude_tuple_list'' = [] una lista di istanze di tuple globali nel g-nodo di ricerca di livello ''l_n0'' + 1. * Per ''i'' da 0 a ''gsize[l_n0]'' - 1: Se ''i'' ≠ ''p_n0'': Metti in ''exclude_tuple_list'' il g-nodo (''l_n0'', ''i''). * Metti in ''exclude_tuple_list'' il nodo ''respondant''. * While ''my_records_size()'' + ''retrieving_keys.size'' < ''max_records'': * # la memoria destinata da ''n'' al servizio ''p'' non è esaurita. * Attendi qualche istante per non gravare sulle prestazioni della rete. * Esegue ''ret = contact_peer(p_id, tuple_n, r, tdh.timeout_exec, True, out respondant, exclude_tuple_list)''. . Il valore restituito dovrebbe essere un !RequestSendKeysResponse, cioè una lista di Object. Altrimenti la risposta viene ignorata. * Se ''ret'' è una istanza di !RequestSendKeysResponse: * Per ogni chiave ''k'' in ''ret'': * Se ''is_valid_key(k)'': * Se ''k'' ∉ ''my_records.keys'' '''e''' ''k'' ∉ ''removed_keys'' '''e''' ''k'' ∉ ''retrieving_keys.keys'': * # Non sa nulla di ''k''. * Se dist(h~-,,p,,-~(k), n) < dist(h~-,,p,,-~(k), respondant): * Avvia il recupero di ''k''. * Attendi qualche istante per non gravare sulle prestazioni della rete. * Metti in ''exclude_tuple_list'' il nodo ''respondant''. * Se riceve !PeersNoParticipantsInNetworkError: # L'algoritmo termina. === ttl_db_got_request === '''IPeersResponse ttl_db_got_request(IPeersRequest r, Object k) throws !PeersRefuseExecutionError''' * Quando riceve una richiesta ''r'' per la chiave ''k'', la classe !PeerService del servizio ''p'' fa queste operazioni: * Se ''timer_non_exhaustive'' '''non''' è espirato: * Se ''k'' ∈ ''my_records.keys'': * Se ''r'' è di scrittura (o di lettura+scrittura) e prevede la cancellazione di ''k'': * Elabora normalmente la richiesta ''r'', rimuovendo l'elemento da ''my_records''. * Mette ''k'' in ''removed_keys''. * # L'algoritmo termina. * Altrimenti: * Elabora normalmente la richiesta ''r'', non sono previste variazioni in ''my_records.keys''. * # L'algoritmo termina. * Altrimenti-Se ''k'' ∈ ''removed_keys'': * Se ''r'' è di lettura o lettura+scrittura: * Restituisce al richiedente l'eccezione NOT_FOUND. * # L'algoritmo termina. * Altrimenti: * # ''r'' è di scrittura: * Rimuove ''k'' da ''removed_keys''. * Se la memoria destinata da ''n'' al servizio ''p'' non è esaurita (considerando ''my_records.size'' + ''retrieving_keys.size''): * Elabora normalmente la richiesta ''r'', aggiungendo un record in ''my_records[k]''. * # L'algoritmo termina. * Altrimenti: * Resetta il tempo di ''timer_non_exhaustive'' a TTL. * Rifiuta di elaborare ''r'' (perché ''out of memory''). * # L'algoritmo termina. * Altrimenti-Se ''k'' ∈ ''retrieving_keys.keys'': * Se ''r'' è di sola lettura: * Rifiuta di elaborare ''r'' (perché ''non esaustivo''). * # L'algoritmo termina. * Altrimenti: * # ''r'' è di scrittura o lettura+scrittura: * Attende di ricevere una comunicazione dal canale ''retrieving_keys[k]''. * Se ''r'' prevede una lettura prima della scrittura: * Se ''k'' ∉ ''my_records.keys'': * # Il nodo non ha il record ancora. * Mette ''k'' in ''removed_keys''. * Restituisce al richiedente l'eccezione NOT_FOUND. * # L'algoritmo termina. * Altrimenti: * Elabora normalmente la richiesta ''r''. * # L'algoritmo termina. * Altrimenti: * Elabora normalmente la richiesta ''r''. * # L'algoritmo termina. * Altrimenti: * # Non sa nulla di ''k''. * Se la memoria destinata da ''n'' al servizio ''p'' non è esaurita (considerando ''my_records.size'' + ''retrieving_keys.size''): * Mette un nuovo canale in ''retrieving_keys[k]''. * In una nuova tasklet: * Avvia il recupero di ''k''. * Resetta il tempo di ''timer_non_exhaustive'' a TTL. * Rifiuta di elaborare ''r'' (perché ''non esaustivo''). * # L'algoritmo termina. * Altrimenti: * # È ''esaustivo''. * Svuota la lista ''removed_keys''. * Se ''k'' ∈ ''my_records.keys'': * Elabora normalmente la richiesta ''r''. * # L'algoritmo termina. * Altrimenti: * Se ''r'' è di sola lettura o di lettura+scrittura: * Restituisce al richiedente l'eccezione NOT_FOUND. * # L'algoritmo termina. * Altrimenti: * # ''r'' è di scrittura: * Se la memoria destinata da ''n'' al servizio ''p'' non è esaurita (considerando ''my_records.size'' + ''retrieving_keys.size''): * Elabora normalmente la richiesta ''r'', aggiungendo un record in ''my_records[k]''. * # L'algoritmo termina. * Altrimenti: * Resetta il tempo di ''timer_non_exhaustive'' a TTL. * Rifiuta di elaborare ''r'' (perché ''out of memory''). * # L'algoritmo termina. === ttl_db_retrieve_record === '''internal void ttl_db_retrieve_record(Object k)''' * Quando vuole recuperare il record per la chiave ''k'', la classe !PeerService del servizio ''p'' fa queste operazioni: * Prepara la richiesta ''r'' che dice "wait_then_send_record(k)". * '''Nota''': chi riceve una richiesta "wait_then_send_record(k)" la tratta come una normale richiesta di sola lettura per la chiave ''k'', a parte l'attesa. * Try: * Esegue ''ret = contact_peer(p_id, peer_tuple(k), r, timeout_exec, True, out respondant)''. * Se ''ret'' = !NotFoundError: * Se ''k'' ∉ ''removed_keys'': * Aggiunge ''k'' a ''removed_keys''. * Atomicamente (senza schedulare altre tasklet) invia un messaggio a tutti quelli che sono in attesa su ''retrieving_keys[k]'' e rimuove ''k'' da ''retrieving_keys''. * Altrimenti: * Scrive ''ret'' in ''my_records[k]''. * Rimuove ''k'' da ''removed_keys'', se c'era. * Atomicamente (senza schedulare altre tasklet) invia un messaggio a tutti quelli che sono in attesa su ''retrieving_keys[k]'' e rimuove ''k'' da ''retrieving_keys''. * Se riceve !PeersNoParticipantsInNetworkError: * Se ''k'' ∉ ''removed_keys'': * Aggiunge ''k'' a ''removed_keys''. * Atomicamente (senza schedulare altre tasklet) invia un messaggio a tutti quelli che sono in attesa su ''retrieving_keys[k]'' e rimuove ''k'' da ''retrieving_keys''. * Se riceve !PeersRefuseExecutionError: * Se ''k'' ∉ ''removed_keys'': * Aggiunge ''k'' a ''removed_keys''. * Atomicamente (senza schedulare altre tasklet) invia un messaggio a tutti quelli che sono in attesa su ''retrieving_keys[k]'' e rimuove ''k'' da ''retrieving_keys''. === fixed_keys_db_begin === '''void fixed_keys_db_begin(List<Object> K)''' * Quando il bootstrap è completo, la classe !PeerService del servizio ''p'' fa queste operazioni: * Crea una lista vuota di chiavi ''k'' di cui il recupero è ancora da iniziare, ''not_started_keys''. * Crea una mappa vuota di chiavi ''k'' in corso di recupero associate ad un canale di comunicazione tra tasklet, ''retrieving_keys''. * Per ogni chiave ''k'' nell'insieme completo ''K'': * Aggiunge ''k'' a ''not_started_keys''. * Per ogni chiave ''k'' nell'insieme completo ''K'': * Sia ''respondant'' il detentore corrente di ''k''. Lo trova con una chiamata a contact_peer in cui non chiede nulla. * Se dist(h~-,,p,,-~(k), n) < dist(h~-,,p,,-~(k), respondant): * Mette un nuovo canale in ''retrieving_keys[k]''. * In una nuova tasklet: * Avvia il recupero di ''k''. * Rimuove ''k'' da ''not_started_keys''. * Opzionalmente, a seconda di quante sono le chiavi, attendi qualche istante per non gravare sulle prestazioni della rete. * Altrimenti: * Rimuove ''k'' da ''not_started_keys''. === fixed_keys_db_got_request === '''IPeersResponse fixed_keys_db_got_request(IPeersRequest r, Object k) throws !PeersRefuseExecutionError''' * Quando riceve una richiesta ''r'' per la chiave ''k'', la classe !PeerService del servizio ''p'' fa queste operazioni: * Se k ∈ not_started_keys: * Rifiuta di elaborare ''r'' (perché ''non esaustivo''). * Se k ∈ retrieving_keys.keys: * Se ''r'' è di sola lettura: * Rifiuta di elaborare ''r'' (perché ''non esaustivo''). * # L'algoritmo termina. * Altrimenti: * # ''r'' è di scrittura: * Attende di ricevere una comunicazione dal canale ''retrieving_keys[k]''. * Elabora normalmente la richiesta ''r''. * # L'algoritmo termina. * Altrimenti: * Elabora normalmente la richiesta ''r''. * # L'algoritmo termina. === fixed_keys_db_retrieve_record === '''internal void fixed_keys_db_retrieve_record(Object k)''' * Quando vuole recuperare il record per la chiave ''k'', la classe !PeerService del servizio ''p'' fa queste operazioni: * Prepara la richiesta ''r'' che dice "wait_then_send_record(k)". * '''Nota''': chi riceve una richiesta "wait_then_send_record(k)" la tratta come una normale richiesta di sola lettura per la chiave ''k'', a parte l'attesa. * Try: * Esegue ''ret = contact_peer(p_id, peer_tuple(k), r, timeout_exec, True, out respondant)''. * Scrive ''ret'' in ''my_records[k]''. * Se riceve !PeersNoParticipantsInNetworkError oppure !PeersRefuseExecutionError: * Valorizza ''my_records[k]'' con un appropriato default. * Atomicamente (senza schedulare altre tasklet) invia un messaggio a tutti quelli che sono in attesa su ''retrieving_keys[k]'' e rimuove ''k'' da ''retrieving_keys''. | 
Modulo PeerServices - Appunti - Algoritmi 2 / 2
Contents
Algoritmo di rilevamento di non partecipazione
check_non_participation
bool check_non_participation(p_id, lvl, _pos)
- In questo algoritmo non ci interessa sapere se un g-nodo partecipa, ma solo se è possibile dire con certezza che esso non partecipa. In caso di incertezza l'algoritmo restituisce False.
- Produci x̄ = la tupla x̄0·x̄1·...·x̄lvl-1 dove x̄i = 0 per ogni i. La tupla identifica un indirizzo a caso all'interno del g-nodo g = (lvl, _pos). Se lvl = 0 allora x̄ è null. 
- Produci n = make_tuple_node(new HCoord(0, pos[0]), lvl+1) , cioè la tupla n0·n1·...·nlvl. La tupla che identifica il nodo corrente nel g-nodo di livello lvl+1 in cui il messaggio si muoverà. 
- m’ = new PeerMessageForwarder. 
- m’.n = n. 
- m’.x̄ = x̄. 
- m’.lvl = lvl. 
- m’.pos = _pos. 
- m’.p_id = p_id. 
- m’.msg_id = un identificativo generato a caso per questo messaggio. 
- Calcola timeout_instradamento = f ( map_paths.i_peers_get_nodes_in_my_group(lvl + 1) ). 
- Prepara waiting_answer = new WaitingAnswer(null, (lvl,_pos) come PeerTupleGNode con top = lvl+1). Il fatto che l'istanza di IPeersRequest è a null fa in modo che i metodi remoti che ricevono le notifiche si comportano in modo adeguato. Sostanzialmente dovrebbe cambiare solo il fatto che quando si riceve la segnalazione di get_request si risponde sempre con l'eccezione PeersUnknownMessageError, ache se si è potuto recuperare l'istanza di WaitingAnswer. Sull'istanza di WaitingAnswer viene poi valorizzato il membro response con qualcosa diverso da null solo per indicare che il g-nodo partecipa. 
- waiting_answer_map[m’.msg_id] = waiting_answer. 
- IPeersManagerStub gwstub 
- IPeersManagerStub? failed = null 
- While True: - Try: - Calcola gwstub = map_paths.i_peers_gateway(lvl, _pos, null, failed)
 
- Se riceve l'eccezione PeersNonexistentDestinationError: - Restituisci True. Rimuovi waiting_answer_map[m’.msg_id]. Termina algoritmo.
 
- Try: - Esegue gwstub.forward_peer_message(m’).
 
- Se riceve StubError o DeserializeError: - failed = gwstub.
- Continua con la prossima iterazione del ciclo.
 
- Esci dal ciclo.
 
- Try: 
- Try: - Sta in attesa su waiting_answer.ch per max timeout_instradamento.
- Se waiting_answer.exclude_gnode ≠ null: - Restituisce False. Rimuovi waiting_answer_map[m’.msg_id]. Termina algoritmo.
 
- Altrimenti-Se waiting_answer.non_participant_gnode ≠ null: - # significa che abbiamo ricevuto notizia di un gnodo non partecipante.
- waiting_answer.non_participant_gnode è un PeerTupleGNode che rappresenta un g-nodo h dentro il mio g-nodo di livello top. 
- Se è visibile nella mia mappa, cioè se (lvl,top) non partecipa: - Restituisci True. Rimuovi waiting_answer_map[m’.msg_id]. Termina algoritmo.
 
- Altrimenti: - Restituisce False. Rimuovi waiting_answer_map[m’.msg_id]. Termina algoritmo.
 
 
- Altrimenti-Se waiting_answer.response ≠ null: - # significa che abbiamo ricevuto il contatto e che lvl,_pos partecipa.
- Restituisce False. Rimuovi waiting_answer_map[m’.msg_id]. Termina algoritmo.
 
- Altrimenti: - # significa che abbiamo ricevuto un nuovo valore in waiting_answer.min_target.
- Restituisce False. Rimuovi waiting_answer_map[m’.msg_id]. Termina algoritmo.
 
 
- Se riceve l'eccezione TimeoutError: - # dobbiamo trattare waiting_answer.min_target come da escludere.
- Restituisce False. Rimuovi waiting_answer_map[m’.msg_id]. Termina algoritmo.
 
Algoritmo di divulgazione della partecipazione
publish_my_participation
void publish_my_participation(p_id)
- gn = make_tuple_gnode(new HCoord(0, pos[0]), levels). La tupla n0·n1·...·nlevels-1, che identifica il nodo corrente nella rete. 
- tempo_attesa = 300 secondi. 
- iterazioni = 5. 
- While True (per sempre): - Se iterazioni > 0: - Decrementa iterazioni di 1.
 
- Altrimenti: - tempo_attesa = 1 giorno + random(1..24*60*60) secondi.
 
- Prepara un IPeersMissingArcHandler missing_handler che in caso di invocazione esegua: - Calcola tcp_stub = neighbors_factory.i_peers_get_tcp(missing_arc).
- Try: - tcp_stub.set_participant(p_id, gn).
 
- Se riceve StubError o DeserializeError: - Ignora.
 
 
- Calcola br_stub = neighbors_factory.i_peers_get_broadcast(missing_handler).
- Try: - br_stub.set_participant(p_id, gn).
 
- Se riceve StubError o DeserializeError: - Ignora.
 
- Aspetta tempo_attesa.
 
set_participant
void set_participant(int p_id, PeerTupleGNode gn)
- E' già stata istanziata lista_recenti un ArrayList di HCoord. 
- Se  services.has_key(p_id) AND NOT services[p_id].p_is_optional: - Ignora il messaggio. Algoritmo termina.
 
- int case, HCoord ret. 
- Calcola convert_tuple_gnode(gn, out case, out ret).
- Se case = 1: - Cioè gn rappresenta un mio g-nodo.
- Ignora il messaggio. Algoritmo termina.
 
- Altrimenti: - Cioè gn rappresenta un g-nodo a cui io non appartengo, ed ho già calcolato in ret il g-nodo visibile nella mia topologia in cui gn si trova.
- Se ret ∈ lista_recenti: - Ignora il messaggio. Algoritmo termina.
 
- Altrimenti: - lista_recenti.add(ret).
- Se NOT participant_maps.has_key(p_id): - participant_maps[p_id] = new ParticipantMap(). 
 
- participant_maps[p_id].participant_list.add(ret).
- ret_gn = make_tuple_gnode(ret, levels) 
- Prepara un IPeersMissingArcHandler missing_handler che in caso di invocazione esegua: - Calcola tcp_stub = neighbors_factory.i_peers_get_tcp(missing_arc).
- Try: - tcp_stub.set_participant(p_id, ret_gn).
 
- Se riceve StubError o DeserializeError: - Ignora.
 
 
- Calcola br_stub = neighbors_factory.i_peers_get_broadcast(missing_handler).
- Try: - br_stub.set_participant(p_id, ret_gn).
 
- Se riceve StubError o DeserializeError: - Ignora.
 
- Svolgi quanto segue in una nuova tasklet portando dietro ret: - Aspetta 60 secondi.
- lista_recenti.remove(ret).
 
 
 
Algoritmo di mantenimento di un database distribuito
begin_replica
bool begin_replica(q, p_id, x̄, r, timeout_exec, out IPeersResponse? resp, out IPeersContinuation cont)
- Gli argomenti sono: - int q: il numero delle repliche richieste, 
- int p_id, 
- PeerTupleNode x̄: la tupla dell'hash della chiave del record, cioè hp ( k ), 
- IPeersRequest r: la richiesta di replicare la coppia k,val , 
- int timeout_exec, 
- resp viene valorizzato con la risposta o null; 
- cont è un oggetto di cui all'esterno si sa solo che implementa l'interfaccia vuota IPeersContinuation. 
 
- lista_repliche = new List di PeerTupleNode. 
- exclude_tuple_list = new PeerTupleGNodeContainer(x̄.tuple.size). 
- cont = {q, p_id, x̄, r, timeout_exec, lista_repliche, exclude_tuple_list}.
- Return next_replica(cont, out resp).
next_replica
bool next_replica(IPeersContinuation cont, out IPeersResponse? resp)
- resp = null.
- Se cont.lista_repliche.size ≥ cont.q: - Return False.
 
- PeerTupleNode respondant; 
- ret = contact_peer(cont.p_id, cont.x̄, cont.r, cont.timeout_exec, True, out respondant, cont.exclude_tuple_list).
- Se si riceve l'eccezione PeersNoParticipantsInNetworkError: - Return False.
 
- resp = ret.
- aggiungi respondant a cont.lista_repliche.
- aggiungi respondant a cont.exclude_tuple_list.
- Return cont.lista_repliche.size < cont.q. 
ttl_db_begin
I metodi ttl_db_* servono a gestire un database distribuito in cui i record hanno una scadenza, o TTL. Il modulo, oltre a fornire tali metodi, fornisce la classe RequestSendKeys, la classe RequestSendKeysResponse, l'interfaccia ITemporalDatabaseHandler.
Un presupposto, comune ai database temporali e ai database fixed_keys, è che sia le chiavi sia i valori dei record siano istanze di classi derivate da Object e siano serializzabili.
La classe RequestSendKeys è una classe serializzabile, che deriva da Object, e non ha alcun dato al suo interno. Implementa l'interfaccia (vuota) IPeersRequest. È la richiesta di inviare tutte le chiavi memorizzate.
La classe RequestSendKeysResponse è una classe serializzabile, che deriva da Object, e contiene una lista di istanze di Object serializzabili. Implementa l'interfaccia (vuota) IPeersResponse. È la risposta alla richiesta di inviare tutte le chiavi memorizzate.
L'interfaccia ITemporalDatabaseHandler espone questi metodi:
- int p_id: una proprietà che dice l'identificativo di questo servizio. 
- int timeout_exec: una proprietà che dice il tempo massimo di esecuzione per la richiesta RequestSendKeys per questo servizio. 
- int msec_ttl: una proprietà che dice il numero di millisecondi prima che un record di questo servizio scada. 
- bool is_valid_key(Object k): una funzione che dice se k è una chiave valida per il servizio. 
- bool key_equal_data(Object k1, Object k2) e uint key_hash_data(Object k): metodi le cui firme sono adatte per i delegati Gee.EqualDataFunc<Object> e Gee.HashDataFunc<Object>, per costruire una HashMap o una lista "con funzionalità di ricerca" di chiavi del servizio. 
- int max_records: una proprietà che dice il numero massimo di record che il nodo può memorizzare per questo servizio. 
- int my_records_size(): una funzione per sapere il numero di record per questo servizio attualmente memorizzati dal nodo. 
- bool my_records_contains(Object k): una funzione per sapere se il record per la chiave k per questo servizio è attualmente memorizzato dal nodo. 
void ttl_db_begin(ITemporalDatabaseHandler tdh, List<int> tuple_n)
- Viene avviata in una tasklet dalla classe del servizio, che deriva PeerService. 
- Se si tratta di un servizio opzionale, viene chiamata solo dopo che sono state reperite con successo le mappe dei partecipanti ai servizi opzionali.
- Gli argomenti del metodo ttl_db_begin sono: - tdh: istanza di una classe che implementa ITemporalDatabaseHandler sopra descritta. 
- tuple_n: una tupla con tutte le posizioni del nodo da 0 a levels - 1. 
 
- timer_non_exhaustive = un timer che scade dopo tdh.msec_ttl millisecondi. 
- removed_keys = new ArrayList<Object>(tdh.key_equal_data): una lista vuota di chiavi cancellate su richiesta. 
- retrieving_keys = new HashMap<Object,INtkdChannel>(tdh.key_equal_data, tdh.key_hash_data): una mappa vuota di chiavi in corso di recupero associate ad un canale di comunicazione tra tasklet. 
- IPeersRequest r = new RequestSendKeys(). 
- Nota: spostare sulla descrizione di ttl_db_got_request. Chi riceve tale richiesta risponde subito con tutte le chiavi k ∈ my_records.keys. 
- Try: - respondant = null. 
- Esegue IPeersResponse ret = contact_peer(tdh.p_id, tuple_n, r, tdh.timeout_exec, True, out respondant). 
- Il valore restituito dovrebbe essere un RequestSendKeysResponse, cioè una lista di Object. Altrimenti la risposta viene ignorata. 
- Se ret è una istanza di RequestSendKeysResponse: - Per ogni chiave k in ret: - Se tdh.is_valid_key(k): - Se not my_records_contains(k) e k ∉ removed_keys e not retrieving_keys.has_key(k): - # Non sa nulla di k. 
- Se dist(hp(k), n) < dist(hp(k), respondant): - Avvia il recupero di k. 
- Attendi qualche istante per non gravare sulle prestazioni della rete.
 
 
 
 
 
- Calcola l_n0 = livello del massimo distinto g-nodo di respondant. 
- Calcola p_n0 = posizione del massimo distinto g-nodo di respondant. 
- tuple_n = una tupla con tutte le posizioni del nodo da 0 a l_n0 inclusi. 
- Prepara exclude_tuple_list = [] una lista di istanze di tuple globali nel g-nodo di ricerca di livello l_n0 + 1. 
- Per i da 0 a gsize[l_n0] - 1: Se i ≠ p_n0: Metti in exclude_tuple_list il g-nodo (l_n0, i). 
- Metti in exclude_tuple_list il nodo respondant. 
- While my_records_size() + retrieving_keys.size < max_records: - # la memoria destinata da n al servizio p non è esaurita. 
- Attendi qualche istante per non gravare sulle prestazioni della rete.
- Esegue ret = contact_peer(p_id, tuple_n, r, tdh.timeout_exec, True, out respondant, exclude_tuple_list). 
- Il valore restituito dovrebbe essere un RequestSendKeysResponse, cioè una lista di Object. Altrimenti la risposta viene ignorata. 
- Se ret è una istanza di RequestSendKeysResponse: - Per ogni chiave k in ret: - Se is_valid_key(k): - Se k ∉ my_records.keys e k ∉ removed_keys e k ∉ retrieving_keys.keys: - # Non sa nulla di k. 
- Se dist(hp(k), n) < dist(hp(k), respondant): - Avvia il recupero di k. 
- Attendi qualche istante per non gravare sulle prestazioni della rete.
 
 
 
 
 
- Metti in exclude_tuple_list il nodo respondant. 
 
 
- Se riceve PeersNoParticipantsInNetworkError: - # L'algoritmo termina.
 
ttl_db_got_request
IPeersResponse ttl_db_got_request(IPeersRequest r, Object k) throws PeersRefuseExecutionError
- Quando riceve una richiesta r per la chiave k, la classe PeerService del servizio p fa queste operazioni: - Se timer_non_exhaustive non è espirato: - Se k ∈ my_records.keys: - Se r è di scrittura (o di lettura+scrittura) e prevede la cancellazione di k: - Elabora normalmente la richiesta r, rimuovendo l'elemento da my_records. 
- Mette k in removed_keys. 
- # L'algoritmo termina.
 
- Altrimenti: - Elabora normalmente la richiesta r, non sono previste variazioni in my_records.keys. 
- # L'algoritmo termina.
 
 
- Altrimenti-Se k ∈ removed_keys: - Se r è di lettura o lettura+scrittura: - Restituisce al richiedente l'eccezione NOT_FOUND.
- # L'algoritmo termina.
 
- Altrimenti: - # r è di scrittura: 
- Rimuove k da removed_keys. 
- Se la memoria destinata da n al servizio p non è esaurita (considerando my_records.size + retrieving_keys.size): - Elabora normalmente la richiesta r, aggiungendo un record in my_records[k]. 
- # L'algoritmo termina.
 
- Altrimenti: - Resetta il tempo di timer_non_exhaustive a TTL. 
- Rifiuta di elaborare r (perché out of memory). 
- # L'algoritmo termina.
 
 
 
- Altrimenti-Se k ∈ retrieving_keys.keys: - Se r è di sola lettura: - Rifiuta di elaborare r (perché non esaustivo). 
- # L'algoritmo termina.
 
- Altrimenti: - # r è di scrittura o lettura+scrittura: 
- Attende di ricevere una comunicazione dal canale retrieving_keys[k]. 
- Se r prevede una lettura prima della scrittura: - Se k ∉ my_records.keys: - # Il nodo non ha il record ancora.
- Mette k in removed_keys. 
- Restituisce al richiedente l'eccezione NOT_FOUND.
- # L'algoritmo termina.
 
- Altrimenti: - Elabora normalmente la richiesta r. 
- # L'algoritmo termina.
 
 
- Altrimenti: - Elabora normalmente la richiesta r. 
- # L'algoritmo termina.
 
 
 
- Altrimenti: - # Non sa nulla di k. 
- Se la memoria destinata da n al servizio p non è esaurita (considerando my_records.size + retrieving_keys.size): - Mette un nuovo canale in retrieving_keys[k]. 
- In una nuova tasklet: - Avvia il recupero di k. 
 
 
- Resetta il tempo di timer_non_exhaustive a TTL. 
- Rifiuta di elaborare r (perché non esaustivo). 
- # L'algoritmo termina.
 
 
- Altrimenti: - # È esaustivo. 
- Svuota la lista removed_keys. 
- Se k ∈ my_records.keys: - Elabora normalmente la richiesta r. 
- # L'algoritmo termina.
 
- Altrimenti: - Se r è di sola lettura o di lettura+scrittura: - Restituisce al richiedente l'eccezione NOT_FOUND.
- # L'algoritmo termina.
 
- Altrimenti: - # r è di scrittura: 
- Se la memoria destinata da n al servizio p non è esaurita (considerando my_records.size + retrieving_keys.size): - Elabora normalmente la richiesta r, aggiungendo un record in my_records[k]. 
- # L'algoritmo termina.
 
- Altrimenti: - Resetta il tempo di timer_non_exhaustive a TTL. 
- Rifiuta di elaborare r (perché out of memory). 
- # L'algoritmo termina.
 
 
 
 
 
ttl_db_retrieve_record
internal void ttl_db_retrieve_record(Object k)
- Quando vuole recuperare il record per la chiave k, la classe PeerService del servizio p fa queste operazioni: - Prepara la richiesta r che dice "wait_then_send_record(k)". 
- Nota: chi riceve una richiesta "wait_then_send_record(k)" la tratta come una normale richiesta di sola lettura per la chiave k, a parte l'attesa. 
- Try: - Esegue ret = contact_peer(p_id, peer_tuple(k), r, timeout_exec, True, out respondant). 
- Se ret = NotFoundError: - Se k ∉ removed_keys: - Aggiunge k a removed_keys. 
 
- Atomicamente (senza schedulare altre tasklet) invia un messaggio a tutti quelli che sono in attesa su retrieving_keys[k] e rimuove k da retrieving_keys. 
 
- Altrimenti: - Scrive ret in my_records[k]. 
- Rimuove k da removed_keys, se c'era. 
- Atomicamente (senza schedulare altre tasklet) invia un messaggio a tutti quelli che sono in attesa su retrieving_keys[k] e rimuove k da retrieving_keys. 
 
 
- Se riceve PeersNoParticipantsInNetworkError: - Se k ∉ removed_keys: - Aggiunge k a removed_keys. 
 
- Atomicamente (senza schedulare altre tasklet) invia un messaggio a tutti quelli che sono in attesa su retrieving_keys[k] e rimuove k da retrieving_keys. 
 
- Se riceve PeersRefuseExecutionError: - Se k ∉ removed_keys: - Aggiunge k a removed_keys. 
 
- Atomicamente (senza schedulare altre tasklet) invia un messaggio a tutti quelli che sono in attesa su retrieving_keys[k] e rimuove k da retrieving_keys. 
 
 
fixed_keys_db_begin
void fixed_keys_db_begin(List<Object> K)
- Quando il bootstrap è completo, la classe PeerService del servizio p fa queste operazioni: - Crea una lista vuota di chiavi k di cui il recupero è ancora da iniziare, not_started_keys. 
- Crea una mappa vuota di chiavi k in corso di recupero associate ad un canale di comunicazione tra tasklet, retrieving_keys. 
- Per ogni chiave k nell'insieme completo K: - Aggiunge k a not_started_keys. 
 
- Per ogni chiave k nell'insieme completo K: - Sia respondant il detentore corrente di k. Lo trova con una chiamata a contact_peer in cui non chiede nulla. 
- Se dist(hp(k), n) < dist(hp(k), respondant): - Mette un nuovo canale in retrieving_keys[k]. 
- In una nuova tasklet: - Avvia il recupero di k. 
 
- Rimuove k da not_started_keys. 
- Opzionalmente, a seconda di quante sono le chiavi, attendi qualche istante per non gravare sulle prestazioni della rete.
 
- Altrimenti: - Rimuove k da not_started_keys. 
 
 
 
fixed_keys_db_got_request
IPeersResponse fixed_keys_db_got_request(IPeersRequest r, Object k) throws PeersRefuseExecutionError
- Quando riceve una richiesta r per la chiave k, la classe PeerService del servizio p fa queste operazioni: - Se k ∈ not_started_keys: - Rifiuta di elaborare r (perché non esaustivo). 
 
- Se k ∈ retrieving_keys.keys: - Se r è di sola lettura: - Rifiuta di elaborare r (perché non esaustivo). 
- # L'algoritmo termina.
 
- Altrimenti: - # r è di scrittura: 
- Attende di ricevere una comunicazione dal canale retrieving_keys[k]. 
- Elabora normalmente la richiesta r. 
- # L'algoritmo termina.
 
 
- Altrimenti: - Elabora normalmente la richiesta r. 
- # L'algoritmo termina.
 
 
- Se k ∈ not_started_keys: 
fixed_keys_db_retrieve_record
internal void fixed_keys_db_retrieve_record(Object k)
- Quando vuole recuperare il record per la chiave k, la classe PeerService del servizio p fa queste operazioni: - Prepara la richiesta r che dice "wait_then_send_record(k)". 
- Nota: chi riceve una richiesta "wait_then_send_record(k)" la tratta come una normale richiesta di sola lettura per la chiave k, a parte l'attesa. 
- Try: - Esegue ret = contact_peer(p_id, peer_tuple(k), r, timeout_exec, True, out respondant). 
- Scrive ret in my_records[k]. 
 
- Se riceve PeersNoParticipantsInNetworkError oppure PeersRefuseExecutionError: - Valorizza my_records[k] con un appropriato default. 
 
- Atomicamente (senza schedulare altre tasklet) invia un messaggio a tutti quelli che sono in attesa su retrieving_keys[k] e rimuove k da retrieving_keys. 
 
