9348
Comment:
|
20594
|
Deletions are marked like this. | Additions are marked like this. |
Line 1: | Line 1: |
= Modulo PeerServices - Appunti - Algoritmi principali = === Algoritmo di rilevamento non partecipazione === ==== bool check_non_participation(p_id, lvl, _pos) ==== |
= Modulo PeerServices - Appunti - Algoritmi 2 / 2 = <<TableOfContents(4)>> == Algoritmo di rilevamento di non partecipazione == === bool check_non_participation(p_id, lvl, _pos) === |
Line 15: | Line 17: |
* Prepara ''waiting_answer'' = new !WaitingAnswer(null, (lvl,_pos) come PeerTupleGNode con top = lvl+1). ~-Il fatto che l'istanza di !RemoteCall è a null fa in modo che i metodi remoti che ricevono le notifiche si comportano in modo adeguato. Sostanzialmente dovrebbe cambiare solo il fatto che quando si riceve la segnalazione di get_request si risponde sempre con l'eccezione !PeersUnknownMessageError, ache se si è potuto recuperare l'istanza di !WaitingAnswer. Sull'istanza di !WaitingAnswer viene poi valorizzato il membro response con un !SerializableNone solo per indicare che il g-nodo partecipa.-~ | * Prepara ''waiting_answer'' = new !WaitingAnswer(null, (lvl,_pos) come PeerTupleGNode con top = lvl+1). ~-Il fatto che l'istanza di IPeersRequest è a null fa in modo che i metodi remoti che ricevono le notifiche si comportano in modo adeguato. Sostanzialmente dovrebbe cambiare solo il fatto che quando si riceve la segnalazione di get_request si risponde sempre con l'eccezione !PeersUnknownMessageError, ache se si è potuto recuperare l'istanza di !WaitingAnswer. Sull'istanza di !WaitingAnswer viene poi valorizzato il membro response con qualcosa diverso da ''null'' solo per indicare che il g-nodo partecipa.-~ |
Line 17: | Line 19: |
* IAddressManagerRootDispatcher ''gwstub'' * IAddressManagerRootDispatcher? ''failed'' = null |
* IPeersManagerStub ''gwstub'' * IPeersManagerStub? ''failed'' = null |
Line 25: | Line 27: |
* Esegue gwstub.peers_manager.forward_peer_message(m’). * Se riceve l'eccezione RPCError: |
* Esegue gwstub.forward_peer_message(m’). * Se riceve !StubError o !DeserializeError: |
Line 51: | Line 53: |
=== Algoritmo di divulgazione della partecipazione === ==== void publish_my_participation(p_id) ==== |
== Algoritmo di divulgazione della partecipazione == === void publish_my_participation(p_id) === |
Line 64: | Line 66: |
* tcp_stub.peer_manager.set_participant(p_id, gn). * Se riceve eccezione RPCError: ignora. |
* tcp_stub.set_participant(p_id, gn). * Se riceve !StubError o !DeserializeError: * Ignora. |
Line 68: | Line 71: |
* br_stub.peer_manager.set_participant(p_id, gn). * Se riceve eccezione RPCError: ignora. |
* br_stub.set_participant(p_id, gn). * Se riceve !StubError o !DeserializeError: * Ignora. |
Line 72: | Line 76: |
==== void set_participant(int p_id, PeerTupleGNode gn) ==== | === void set_participant(int p_id, PeerTupleGNode gn) === |
Line 94: | Line 98: |
* tcp_stub.peer_manager.set_participant(p_id, ret_gn). * Se riceve eccezione RPCError: ignora. |
* tcp_stub.set_participant(p_id, ret_gn). * Se riceve !StubError o !DeserializeError: * Ignora. |
Line 98: | Line 103: |
* br_stub.peer_manager.set_participant(p_id, ret_gn). * Se riceve eccezione RPCError: ignora. |
* br_stub.set_participant(p_id, ret_gn). * Se riceve !StubError o !DeserializeError: * Ignora. |
Line 104: | Line 110: |
=== Algoritmo di mantenimento di un database distribuito === ==== bool begin_replica(q, p_id, x̄, r, timeout_exec, out ISerializable resp, out IPeersContinuation cont) ==== |
== Algoritmo di mantenimento di un database distribuito == === bool begin_replica(q, p_id, x̄, r, timeout_exec, out IPeersResponse? resp, out IPeersContinuation cont) === |
Line 110: | Line 116: |
* !RemoteCall ''r'': la richiesta di replicare la coppia k,val , | * IPeersRequest ''r'': la richiesta di replicare la coppia k,val , |
Line 115: | Line 121: |
* ''exclude_tuple_list'' = new PeerTupleGNodeContainer. | * ''exclude_tuple_list'' = new PeerTupleGNodeContainer(x̄.tuple.size). |
Line 119: | Line 125: |
==== bool next_replica(IPeersContinuation cont, out ISerializable resp) ==== * resp = new ISerializableNone. |
=== bool next_replica(IPeersContinuation cont, out IPeersResponse? resp) === * resp = null. |
Line 132: | Line 138: |
==== bool begin_retrieve_cache(p_id, r, timeout_exec, out ISerializable resp, out IPeersContinuation cont) ==== * Gli argomenti sono: * int ''p_id'', * !RemoteCall ''r'': la richiesta di ricevere la cache , * int ''timeout_exec'', * ''resp'' viene valorizzato con la risposta o null; * ''cont'' è un oggetto di cui all'esterno si sa solo che implementa l'interfaccia vuota IPeersContinuation. * resp = new ISerializableNone. * Per ''j'' da 0 a levels-1: * !PeerTupleNode ''x̄'' = make_tuple_node(new HCoord(0, pos[0]), j+1) : cioè la tupla dei primi j+1 livelli di n = n~-,,0,,-~·...·n~-,,j,,-~. * ''exclude_tuple_list'' = new PeerTupleGNodeContainer. * !PeerTupleNode ''respondant''; * ret = contact_peer(p_id, x̄, r, timeout_exec, True, out respondant, exclude_tuple_list). * Se si riceve l'eccezione !PeersNoParticipantsInNetworkError: * Continue. Prossima iterazione del ciclo. * aggiungi respondant a exclude_tuple_list. * cont = {p_id, r, timeout_exec, j, exclude_tuple_list}. * resp = ret. * Return True. * Return False. ==== bool next_retrieve_cache(IPeersContinuation cont, out ISerializable resp) ==== * resp = new ISerializableNone. * ''j'' = cont.j. * !PeerTupleNode ''x̄'' = make_tuple_node(new HCoord(0, pos[0]), j+1) : cioè la tupla dei primi j+1 livelli di n = n~-,,0,,-~·...·n~-,,j,,-~. * !PeerTupleNode ''respondant''; * ret = contact_peer(cont.p_id, x̄, cont.r, cont.timeout_exec, True, out respondant, cont.exclude_tuple_list, True). * Se si riceve l'eccezione !PeersNoParticipantsInNetworkError: * Return False. * resp = ret. * Return False. |
=== void ttl_db_begin() === * Viene avviata in una tasklet dalla classe del servizio, che deriva !PeerService. . Se si tratta di un servizio opzionale, viene chiamata solo dopo che sono state reperite con successo le mappe dei partecipanti ai servizi opzionali. . Riceve come primo argomento una istanza di una classe che implementa l'interfaccia ITemporalDatabaseHandler. Tale classe implementa le operazioni che servono a gestire un database distribuito in cui i record hanno una scadenza, o TTL. Un presupposto è che sia le chiavi sia i valori dei record siano istanze di classi derivate da ''Object'' e siano serializzabili. . L'interfaccia ITemporalDatabaseHandler espone questi metodi: * ''int msec_ttl'': una proprietà che dice il numero di millisecondi prima che un record scada. * ''bool is_valid_key(Object k)'': una funzione che dice se ''k'' è una chiave valida per il servizio. * ''bool key_equal_data(Object k1, Object k2)'' e ''uint key_hash_data(Object k)'': metodi le cui firme sono adatte per i delegati ''Gee.!EqualDataFunc<Object>'' e ''Gee.!HashDataFunc<Object>'', per costruire una !HashMap o una lista "con funzionalità di ricerca" di chiavi del servizio. * ''int max_records'': una proprietà che dice il numero massimo di record che il nodo può memorizzare per questo servizio. * ''int my_records_size()'': una funzione per sapere il numero di records attualmente memorizzati dal nodo. . Gli argomenti del metodo ''ttl_db_begin'' sono: * ''tdh'': istanza di ITemporalDatabaseHandler sopra descritta. * ''tuple_n'': una tupla con tutte le posizioni del nodo da 0 a ''levels'' - 1. * ''timer_non_exhaustive'' = un timer che scade dopo ''tdh.msec_ttl'' millisecondi. * ''removed_keys'' = new !ArrayList<Object>(tdh.key_equal_data): una lista vuota di chiavi cancellate su richiesta. * ''retrieving_keys'' = new !HashMap<Object,INtkdChannel>(tdh.key_equal_data, tdh.key_hash_data): una mappa vuota di chiavi in corso di recupero associate ad un canale di comunicazione tra tasklet. * ''IPeersRequest r'' = new !RequestSendKeys(): la richiesta che dice di inviare tutte le chiavi memorizzate. . '''Nota: spostare sulla descrizione di ''ttl_db_got_request''.''' Chi riceve tale richiesta risponde subito con tutte le chiavi ''k'' ∈ ''my_records.keys''. * ''timeout_exec'' = tempo massimo di esecuzione per ''r''. Una costante. * Try: * ''respondant'' = null. * Esegue ''IPeersResponse ret = contact_peer(p_id, tuple_n, r, timeout_exec, True, out respondant)''. . Il valore restituito dovrebbe essere un !RequestSendKeysResponse, cioè una lista di Object. Altrimenti la risposta viene ignorata. * Se ''ret'' è una istanza di !RequestSendKeysResponse: * Per ogni chiave ''k'' in ''ret'': * Se ''is_valid_key(k)'': * Se ''k'' ∉ ''my_records.keys'' '''e''' ''k'' ∉ ''removed_keys'' '''e''' ''k'' ∉ ''retrieving_keys.keys'': * # Non sa nulla di ''k''. * Se dist(h~-,,p,,-~(k), n) < dist(h~-,,p,,-~(k), respondant): * Avvia il recupero di ''k''. * Attendi qualche istante per non gravare sulle prestazioni della rete. * Calcola ''l_n0'' = livello del massimo distinto g-nodo di ''respondant''. * Calcola ''p_n0'' = posizione del massimo distinto g-nodo di ''respondant''. * ''tuple_n'' = una tupla con tutte le posizioni del nodo da 0 a ''l_n0'' inclusi. * Prepara ''exclude_tuple_list'' = [] una lista di istanze di tuple globali nel g-nodo di ricerca di livello ''l_n0'' + 1. * Per ''i'' da 0 a ''gsize[l_n0]'' - 1: Se ''i'' ≠ ''p_n0'': Metti in ''exclude_tuple_list'' il g-nodo (''l_n0'', ''i''). * Metti in ''exclude_tuple_list'' il nodo ''respondant''. * While ''my_records_size()'' + ''retrieving_keys.size'' < ''max_records'': * # la memoria destinata da ''n'' al servizio ''p'' non è esaurita. * Attendi qualche istante per non gravare sulle prestazioni della rete. * Esegue ''ret = contact_peer(p_id, tuple_n, r, timeout_exec, True, out respondant, exclude_tuple_list)''. . Il valore restituito dovrebbe essere un !RequestSendKeysResponse, cioè una lista di Object. Altrimenti la risposta viene ignorata. * Se ''ret'' è una istanza di !RequestSendKeysResponse: * Per ogni chiave ''k'' in ''ret'': * Se ''is_valid_key(k)'': * Se ''k'' ∉ ''my_records.keys'' '''e''' ''k'' ∉ ''removed_keys'' '''e''' ''k'' ∉ ''retrieving_keys.keys'': * # Non sa nulla di ''k''. * Se dist(h~-,,p,,-~(k), n) < dist(h~-,,p,,-~(k), respondant): * Avvia il recupero di ''k''. * Attendi qualche istante per non gravare sulle prestazioni della rete. * Metti in ''exclude_tuple_list'' il nodo ''respondant''. * Se riceve !PeersNoParticipantsInNetworkError: # L'algoritmo termina. === IPeersResponse ttl_db_got_request(IPeersRequest r, Object k) throws !PeersRefuseExecutionError === * Quando riceve una richiesta ''r'' per la chiave ''k'', la classe !PeerService del servizio ''p'' fa queste operazioni: * Se ''timer_non_exhaustive'' '''non''' è espirato: * Se ''k'' ∈ ''my_records.keys'': * Se ''r'' è di scrittura (o di lettura+scrittura) e prevede la cancellazione di ''k'': * Elabora normalmente la richiesta ''r'', rimuovendo l'elemento da ''my_records''. * Mette ''k'' in ''removed_keys''. * # L'algoritmo termina. * Altrimenti: * Elabora normalmente la richiesta ''r'', non sono previste variazioni in ''my_records.keys''. * # L'algoritmo termina. * Altrimenti-Se ''k'' ∈ ''removed_keys'': * Se ''r'' è di lettura o lettura+scrittura: * Restituisce al richiedente l'eccezione NOT_FOUND. * # L'algoritmo termina. * Altrimenti: * # ''r'' è di scrittura: * Rimuove ''k'' da ''removed_keys''. * Se la memoria destinata da ''n'' al servizio ''p'' non è esaurita (considerando ''my_records.size'' + ''retrieving_keys.size''): * Elabora normalmente la richiesta ''r'', aggiungendo un record in ''my_records[k]''. * # L'algoritmo termina. * Altrimenti: * Resetta il tempo di ''timer_non_exhaustive'' a TTL. * Rifiuta di elaborare ''r'' (perché ''out of memory''). * # L'algoritmo termina. * Altrimenti-Se ''k'' ∈ ''retrieving_keys.keys'': * Se ''r'' è di sola lettura: * Rifiuta di elaborare ''r'' (perché ''non esaustivo''). * # L'algoritmo termina. * Altrimenti: * # ''r'' è di scrittura o lettura+scrittura: * Attende di ricevere una comunicazione dal canale ''retrieving_keys[k]''. * Se ''r'' prevede una lettura prima della scrittura: * Se ''k'' ∉ ''my_records.keys'': * # Il nodo non ha il record ancora. * Mette ''k'' in ''removed_keys''. * Restituisce al richiedente l'eccezione NOT_FOUND. * # L'algoritmo termina. * Altrimenti: * Elabora normalmente la richiesta ''r''. * # L'algoritmo termina. * Altrimenti: * Elabora normalmente la richiesta ''r''. * # L'algoritmo termina. * Altrimenti: * # Non sa nulla di ''k''. * Se la memoria destinata da ''n'' al servizio ''p'' non è esaurita (considerando ''my_records.size'' + ''retrieving_keys.size''): * Mette un nuovo canale in ''retrieving_keys[k]''. * In una nuova tasklet: * Avvia il recupero di ''k''. * Resetta il tempo di ''timer_non_exhaustive'' a TTL. * Rifiuta di elaborare ''r'' (perché ''non esaustivo''). * # L'algoritmo termina. * Altrimenti: * # È ''esaustivo''. * Svuota la lista ''removed_keys''. * Se ''k'' ∈ ''my_records.keys'': * Elabora normalmente la richiesta ''r''. * # L'algoritmo termina. * Altrimenti: * Se ''r'' è di sola lettura o di lettura+scrittura: * Restituisce al richiedente l'eccezione NOT_FOUND. * # L'algoritmo termina. * Altrimenti: * # ''r'' è di scrittura: * Se la memoria destinata da ''n'' al servizio ''p'' non è esaurita (considerando ''my_records.size'' + ''retrieving_keys.size''): * Elabora normalmente la richiesta ''r'', aggiungendo un record in ''my_records[k]''. * # L'algoritmo termina. * Altrimenti: * Resetta il tempo di ''timer_non_exhaustive'' a TTL. * Rifiuta di elaborare ''r'' (perché ''out of memory''). * # L'algoritmo termina. === internal void ttl_db_retrieve_record(Object k) === * Quando vuole recuperare il record per la chiave ''k'', la classe !PeerService del servizio ''p'' fa queste operazioni: * Prepara la richiesta ''r'' che dice "wait_then_send_record(k)". * '''Nota''': chi riceve una richiesta "wait_then_send_record(k)" la tratta come una normale richiesta di sola lettura per la chiave ''k'', a parte l'attesa. * Try: * Esegue ''ret = contact_peer(p_id, peer_tuple(k), r, timeout_exec, True, out respondant)''. * Se ''ret'' = !NotFoundError: * Se ''k'' ∉ ''removed_keys'': * Aggiunge ''k'' a ''removed_keys''. * Atomicamente (senza schedulare altre tasklet) invia un messaggio a tutti quelli che sono in attesa su ''retrieving_keys[k]'' e rimuove ''k'' da ''retrieving_keys''. * Altrimenti: * Scrive ''ret'' in ''my_records[k]''. * Rimuove ''k'' da ''removed_keys'', se c'era. * Atomicamente (senza schedulare altre tasklet) invia un messaggio a tutti quelli che sono in attesa su ''retrieving_keys[k]'' e rimuove ''k'' da ''retrieving_keys''. * Se riceve !PeersNoParticipantsInNetworkError: * Se ''k'' ∉ ''removed_keys'': * Aggiunge ''k'' a ''removed_keys''. * Atomicamente (senza schedulare altre tasklet) invia un messaggio a tutti quelli che sono in attesa su ''retrieving_keys[k]'' e rimuove ''k'' da ''retrieving_keys''. * Se riceve !PeersRefuseExecutionError: * Se ''k'' ∉ ''removed_keys'': * Aggiunge ''k'' a ''removed_keys''. * Atomicamente (senza schedulare altre tasklet) invia un messaggio a tutti quelli che sono in attesa su ''retrieving_keys[k]'' e rimuove ''k'' da ''retrieving_keys''. === void fixed_keys_db_begin(List<Object> K) === * Quando il bootstrap è completo, la classe !PeerService del servizio ''p'' fa queste operazioni: * Crea una lista vuota di chiavi ''k'' di cui il recupero è ancora da iniziare, ''not_started_keys''. * Crea una mappa vuota di chiavi ''k'' in corso di recupero associate ad un canale di comunicazione tra tasklet, ''retrieving_keys''. * Per ogni chiave ''k'' nell'insieme completo ''K'': * Aggiunge ''k'' a ''not_started_keys''. * Per ogni chiave ''k'' nell'insieme completo ''K'': * Sia ''respondant'' il detentore corrente di ''k''. Lo trova con una chiamata a contact_peer in cui non chiede nulla. * Se dist(h~-,,p,,-~(k), n) < dist(h~-,,p,,-~(k), respondant): * Mette un nuovo canale in ''retrieving_keys[k]''. * In una nuova tasklet: * Avvia il recupero di ''k''. * Rimuove ''k'' da ''not_started_keys''. * Opzionalmente, a seconda di quante sono le chiavi, attendi qualche istante per non gravare sulle prestazioni della rete. * Altrimenti: * Rimuove ''k'' da ''not_started_keys''. === IPeersResponse fixed_keys_db_got_request(IPeersRequest r, Object k) throws !PeersRefuseExecutionError === * Quando riceve una richiesta ''r'' per la chiave ''k'', la classe !PeerService del servizio ''p'' fa queste operazioni: * Se k ∈ not_started_keys: * Rifiuta di elaborare ''r'' (perché ''non esaustivo''). * Se k ∈ retrieving_keys.keys: * Se ''r'' è di sola lettura: * Rifiuta di elaborare ''r'' (perché ''non esaustivo''). * # L'algoritmo termina. * Altrimenti: * # ''r'' è di scrittura: * Attende di ricevere una comunicazione dal canale ''retrieving_keys[k]''. * Elabora normalmente la richiesta ''r''. * # L'algoritmo termina. * Altrimenti: * Elabora normalmente la richiesta ''r''. * # L'algoritmo termina. === internal void fixed_keys_db_retrieve_record(Object k) === * Quando vuole recuperare il record per la chiave ''k'', la classe !PeerService del servizio ''p'' fa queste operazioni: * Prepara la richiesta ''r'' che dice "wait_then_send_record(k)". * '''Nota''': chi riceve una richiesta "wait_then_send_record(k)" la tratta come una normale richiesta di sola lettura per la chiave ''k'', a parte l'attesa. * Try: * Esegue ''ret = contact_peer(p_id, peer_tuple(k), r, timeout_exec, True, out respondant)''. * Scrive ''ret'' in ''my_records[k]''. * Se riceve !PeersNoParticipantsInNetworkError oppure !PeersRefuseExecutionError: * Valorizza ''my_records[k]'' con un appropriato default. * Atomicamente (senza schedulare altre tasklet) invia un messaggio a tutti quelli che sono in attesa su ''retrieving_keys[k]'' e rimuove ''k'' da ''retrieving_keys''. |
Modulo PeerServices - Appunti - Algoritmi 2 / 2
Contents
-
Modulo PeerServices - Appunti - Algoritmi 2 / 2
- Algoritmo di rilevamento di non partecipazione
- Algoritmo di divulgazione della partecipazione
-
Algoritmo di mantenimento di un database distribuito
- bool begin_replica(q, p_id, x̄, r, timeout_exec, out IPeersResponse? resp, out IPeersContinuation cont)
- bool next_replica(IPeersContinuation cont, out IPeersResponse? resp)
- void ttl_db_begin()
- IPeersResponse ttl_db_got_request(IPeersRequest r, Object k) throws !PeersRefuseExecutionError
- internal void ttl_db_retrieve_record(Object k)
- void fixed_keys_db_begin(List<Object> K)
- IPeersResponse fixed_keys_db_got_request(IPeersRequest r, Object k) throws !PeersRefuseExecutionError
- internal void fixed_keys_db_retrieve_record(Object k)
Algoritmo di rilevamento di non partecipazione
bool check_non_participation(p_id, lvl, _pos)
- In questo algoritmo non ci interessa sapere se un g-nodo partecipa, ma solo se è possibile dire con certezza che esso non partecipa. In caso di incertezza l'algoritmo restituisce False.
Produci x̄ = la tupla x̄0·x̄1·...·x̄lvl-1 dove x̄i = 0 per ogni i. La tupla identifica un indirizzo a caso all'interno del g-nodo g = (lvl, _pos). Se lvl = 0 allora x̄ è null.
Produci n = make_tuple_node(new HCoord(0, pos[0]), lvl+1) , cioè la tupla n0·n1·...·nlvl. La tupla che identifica il nodo corrente nel g-nodo di livello lvl+1 in cui il messaggio si muoverà.
m’ = new PeerMessageForwarder.
m’.n = n.
m’.x̄ = x̄.
m’.lvl = lvl.
m’.pos = _pos.
m’.p_id = p_id.
m’.msg_id = un identificativo generato a caso per questo messaggio.
Calcola timeout_instradamento = f ( map_paths.i_peers_get_nodes_in_my_group(lvl + 1) ).
Prepara waiting_answer = new WaitingAnswer(null, (lvl,_pos) come PeerTupleGNode con top = lvl+1). Il fatto che l'istanza di IPeersRequest è a null fa in modo che i metodi remoti che ricevono le notifiche si comportano in modo adeguato. Sostanzialmente dovrebbe cambiare solo il fatto che quando si riceve la segnalazione di get_request si risponde sempre con l'eccezione PeersUnknownMessageError, ache se si è potuto recuperare l'istanza di WaitingAnswer. Sull'istanza di WaitingAnswer viene poi valorizzato il membro response con qualcosa diverso da null solo per indicare che il g-nodo partecipa.
waiting_answer_map[m’.msg_id] = waiting_answer.
IPeersManagerStub gwstub
IPeersManagerStub? failed = null
- While True:
- Try:
- Calcola gwstub = map_paths.i_peers_gateway(lvl, _pos, null, failed)
Se riceve l'eccezione PeersNonexistentDestinationError:
- Restituisci True. Rimuovi waiting_answer_map[m’.msg_id]. Termina algoritmo.
- Try:
- Esegue gwstub.forward_peer_message(m’).
Se riceve StubError o DeserializeError:
- failed = gwstub.
- Continua con la prossima iterazione del ciclo.
- Esci dal ciclo.
- Try:
- Try:
- Sta in attesa su waiting_answer.ch per max timeout_instradamento.
- Se waiting_answer.exclude_gnode ≠ null:
- Restituisce False. Rimuovi waiting_answer_map[m’.msg_id]. Termina algoritmo.
- Altrimenti-Se waiting_answer.non_participant_gnode ≠ null:
- # significa che abbiamo ricevuto notizia di un gnodo non partecipante.
waiting_answer.non_participant_gnode è un PeerTupleGNode che rappresenta un g-nodo h dentro il mio g-nodo di livello top.
- Se è visibile nella mia mappa, cioè se (lvl,top) non partecipa:
- Restituisci True. Rimuovi waiting_answer_map[m’.msg_id]. Termina algoritmo.
- Altrimenti:
- Restituisce False. Rimuovi waiting_answer_map[m’.msg_id]. Termina algoritmo.
- Altrimenti-Se waiting_answer.response ≠ null:
- # significa che abbiamo ricevuto il contatto e che lvl,_pos partecipa.
- Restituisce False. Rimuovi waiting_answer_map[m’.msg_id]. Termina algoritmo.
- Altrimenti:
- # significa che abbiamo ricevuto un nuovo valore in waiting_answer.min_target.
- Restituisce False. Rimuovi waiting_answer_map[m’.msg_id]. Termina algoritmo.
Se riceve l'eccezione TimeoutError:
- # dobbiamo trattare waiting_answer.min_target come da escludere.
- Restituisce False. Rimuovi waiting_answer_map[m’.msg_id]. Termina algoritmo.
Algoritmo di divulgazione della partecipazione
void publish_my_participation(p_id)
gn = make_tuple_gnode(new HCoord(0, pos[0]), levels). La tupla n0·n1·...·nlevels-1, che identifica il nodo corrente nella rete.
tempo_attesa = 300 secondi.
iterazioni = 5.
- While True (per sempre):
Se iterazioni > 0:
- Decrementa iterazioni di 1.
- Altrimenti:
- tempo_attesa = 1 giorno + random(1..24*60*60) secondi.
Prepara un IPeersMissingArcHandler missing_handler che in caso di invocazione esegua:
- Calcola tcp_stub = neighbors_factory.i_peers_get_tcp(missing_arc).
- Try:
- tcp_stub.set_participant(p_id, gn).
Se riceve StubError o DeserializeError:
- Ignora.
- Calcola br_stub = neighbors_factory.i_peers_get_broadcast(missing_handler).
- Try:
- br_stub.set_participant(p_id, gn).
Se riceve StubError o DeserializeError:
- Ignora.
- Aspetta tempo_attesa.
void set_participant(int p_id, PeerTupleGNode gn)
E' già stata istanziata lista_recenti un ArrayList di HCoord.
- Se services.has_key(p_id) AND NOT services[p_id].p_is_optional:
- Ignora il messaggio. Algoritmo termina.
int case, HCoord ret.
- Calcola convert_tuple_gnode(gn, out case, out ret).
- Se case = 1:
- Cioè gn rappresenta un mio g-nodo.
- Ignora il messaggio. Algoritmo termina.
- Altrimenti:
- Cioè gn rappresenta un g-nodo a cui io non appartengo, ed ho già calcolato in ret il g-nodo visibile nella mia topologia in cui gn si trova.
- Se ret ∈ lista_recenti:
- Ignora il messaggio. Algoritmo termina.
- Altrimenti:
- lista_recenti.add(ret).
- Se NOT participant_maps.has_key(p_id):
participant_maps[p_id] = new ParticipantMap().
- participant_maps[p_id].participant_list.add(ret).
ret_gn = make_tuple_gnode(ret, levels)
Prepara un IPeersMissingArcHandler missing_handler che in caso di invocazione esegua:
- Calcola tcp_stub = neighbors_factory.i_peers_get_tcp(missing_arc).
- Try:
- tcp_stub.set_participant(p_id, ret_gn).
Se riceve StubError o DeserializeError:
- Ignora.
- Calcola br_stub = neighbors_factory.i_peers_get_broadcast(missing_handler).
- Try:
- br_stub.set_participant(p_id, ret_gn).
Se riceve StubError o DeserializeError:
- Ignora.
- Svolgi quanto segue in una nuova tasklet portando dietro ret:
- Aspetta 60 secondi.
- lista_recenti.remove(ret).
Algoritmo di mantenimento di un database distribuito
bool begin_replica(q, p_id, x̄, r, timeout_exec, out IPeersResponse? resp, out IPeersContinuation cont)
- Gli argomenti sono:
int q: il numero delle repliche richieste,
int p_id,
PeerTupleNode x̄: la tupla dell'hash della chiave del record, cioè hp ( k ),
IPeersRequest r: la richiesta di replicare la coppia k,val ,
int timeout_exec,
resp viene valorizzato con la risposta o null;
cont è un oggetto di cui all'esterno si sa solo che implementa l'interfaccia vuota IPeersContinuation.
lista_repliche = new List di PeerTupleNode.
exclude_tuple_list = new PeerTupleGNodeContainer(x̄.tuple.size).
- cont = {q, p_id, x̄, r, timeout_exec, lista_repliche, exclude_tuple_list}.
- Return next_replica(cont, out resp).
bool next_replica(IPeersContinuation cont, out IPeersResponse? resp)
- resp = null.
- Se cont.lista_repliche.size ≥ cont.q:
- Return False.
PeerTupleNode respondant;
- ret = contact_peer(cont.p_id, cont.x̄, cont.r, cont.timeout_exec, True, out respondant, cont.exclude_tuple_list).
Se si riceve l'eccezione PeersNoParticipantsInNetworkError:
- Return False.
- resp = ret.
- aggiungi respondant a cont.lista_repliche.
- aggiungi respondant a cont.exclude_tuple_list.
Return cont.lista_repliche.size < cont.q.
void ttl_db_begin()
Viene avviata in una tasklet dalla classe del servizio, che deriva PeerService.
- Se si tratta di un servizio opzionale, viene chiamata solo dopo che sono state reperite con successo le mappe dei partecipanti ai servizi opzionali.
Riceve come primo argomento una istanza di una classe che implementa l'interfaccia ITemporalDatabaseHandler. Tale classe implementa le operazioni che servono a gestire un database distribuito in cui i record hanno una scadenza, o TTL. Un presupposto è che sia le chiavi sia i valori dei record siano istanze di classi derivate da Object e siano serializzabili.
- L'interfaccia ITemporalDatabaseHandler espone questi metodi:
int msec_ttl: una proprietà che dice il numero di millisecondi prima che un record scada.
bool is_valid_key(Object k): una funzione che dice se k è una chiave valida per il servizio.
bool key_equal_data(Object k1, Object k2) e uint key_hash_data(Object k): metodi le cui firme sono adatte per i delegati Gee.EqualDataFunc<Object> e Gee.HashDataFunc<Object>, per costruire una HashMap o una lista "con funzionalità di ricerca" di chiavi del servizio.
int max_records: una proprietà che dice il numero massimo di record che il nodo può memorizzare per questo servizio.
int my_records_size(): una funzione per sapere il numero di records attualmente memorizzati dal nodo.
Gli argomenti del metodo ttl_db_begin sono:
tdh: istanza di ITemporalDatabaseHandler sopra descritta.
tuple_n: una tupla con tutte le posizioni del nodo da 0 a levels - 1.
timer_non_exhaustive = un timer che scade dopo tdh.msec_ttl millisecondi.
removed_keys = new ArrayList<Object>(tdh.key_equal_data): una lista vuota di chiavi cancellate su richiesta.
retrieving_keys = new HashMap<Object,INtkdChannel>(tdh.key_equal_data, tdh.key_hash_data): una mappa vuota di chiavi in corso di recupero associate ad un canale di comunicazione tra tasklet.
IPeersRequest r = new RequestSendKeys(): la richiesta che dice di inviare tutte le chiavi memorizzate.
Nota: spostare sulla descrizione di ttl_db_got_request. Chi riceve tale richiesta risponde subito con tutte le chiavi k ∈ my_records.keys.
timeout_exec = tempo massimo di esecuzione per r. Una costante.
- Try:
respondant = null.
Esegue IPeersResponse ret = contact_peer(p_id, tuple_n, r, timeout_exec, True, out respondant).
Il valore restituito dovrebbe essere un RequestSendKeysResponse, cioè una lista di Object. Altrimenti la risposta viene ignorata.
Se ret è una istanza di RequestSendKeysResponse:
Per ogni chiave k in ret:
Se is_valid_key(k):
Se k ∉ my_records.keys e k ∉ removed_keys e k ∉ retrieving_keys.keys:
# Non sa nulla di k.
Se dist(hp(k), n) < dist(hp(k), respondant):
Avvia il recupero di k.
- Attendi qualche istante per non gravare sulle prestazioni della rete.
Calcola l_n0 = livello del massimo distinto g-nodo di respondant.
Calcola p_n0 = posizione del massimo distinto g-nodo di respondant.
tuple_n = una tupla con tutte le posizioni del nodo da 0 a l_n0 inclusi.
Prepara exclude_tuple_list = [] una lista di istanze di tuple globali nel g-nodo di ricerca di livello l_n0 + 1.
Per i da 0 a gsize[l_n0] - 1: Se i ≠ p_n0: Metti in exclude_tuple_list il g-nodo (l_n0, i).
Metti in exclude_tuple_list il nodo respondant.
While my_records_size() + retrieving_keys.size < max_records:
# la memoria destinata da n al servizio p non è esaurita.
- Attendi qualche istante per non gravare sulle prestazioni della rete.
Esegue ret = contact_peer(p_id, tuple_n, r, timeout_exec, True, out respondant, exclude_tuple_list).
Il valore restituito dovrebbe essere un RequestSendKeysResponse, cioè una lista di Object. Altrimenti la risposta viene ignorata.
Se ret è una istanza di RequestSendKeysResponse:
Per ogni chiave k in ret:
Se is_valid_key(k):
Se k ∉ my_records.keys e k ∉ removed_keys e k ∉ retrieving_keys.keys:
# Non sa nulla di k.
Se dist(hp(k), n) < dist(hp(k), respondant):
Avvia il recupero di k.
- Attendi qualche istante per non gravare sulle prestazioni della rete.
Metti in exclude_tuple_list il nodo respondant.
Se riceve PeersNoParticipantsInNetworkError:
- # L'algoritmo termina.
IPeersResponse ttl_db_got_request(IPeersRequest r, Object k) throws !PeersRefuseExecutionError
Quando riceve una richiesta r per la chiave k, la classe PeerService del servizio p fa queste operazioni:
Se timer_non_exhaustive non è espirato:
Se k ∈ my_records.keys:
Se r è di scrittura (o di lettura+scrittura) e prevede la cancellazione di k:
Elabora normalmente la richiesta r, rimuovendo l'elemento da my_records.
Mette k in removed_keys.
- # L'algoritmo termina.
- Altrimenti:
Elabora normalmente la richiesta r, non sono previste variazioni in my_records.keys.
- # L'algoritmo termina.
Altrimenti-Se k ∈ removed_keys:
Se r è di lettura o lettura+scrittura:
- Restituisce al richiedente l'eccezione NOT_FOUND.
- # L'algoritmo termina.
- Altrimenti:
# r è di scrittura:
Rimuove k da removed_keys.
Se la memoria destinata da n al servizio p non è esaurita (considerando my_records.size + retrieving_keys.size):
Elabora normalmente la richiesta r, aggiungendo un record in my_records[k].
- # L'algoritmo termina.
- Altrimenti:
Resetta il tempo di timer_non_exhaustive a TTL.
Rifiuta di elaborare r (perché out of memory).
- # L'algoritmo termina.
Altrimenti-Se k ∈ retrieving_keys.keys:
Se r è di sola lettura:
Rifiuta di elaborare r (perché non esaustivo).
- # L'algoritmo termina.
- Altrimenti:
# r è di scrittura o lettura+scrittura:
Attende di ricevere una comunicazione dal canale retrieving_keys[k].
Se r prevede una lettura prima della scrittura:
Se k ∉ my_records.keys:
- # Il nodo non ha il record ancora.
Mette k in removed_keys.
- Restituisce al richiedente l'eccezione NOT_FOUND.
- # L'algoritmo termina.
- Altrimenti:
Elabora normalmente la richiesta r.
- # L'algoritmo termina.
- Altrimenti:
Elabora normalmente la richiesta r.
- # L'algoritmo termina.
- Altrimenti:
# Non sa nulla di k.
Se la memoria destinata da n al servizio p non è esaurita (considerando my_records.size + retrieving_keys.size):
Mette un nuovo canale in retrieving_keys[k].
- In una nuova tasklet:
Avvia il recupero di k.
Resetta il tempo di timer_non_exhaustive a TTL.
Rifiuta di elaborare r (perché non esaustivo).
- # L'algoritmo termina.
- Altrimenti:
# È esaustivo.
Svuota la lista removed_keys.
Se k ∈ my_records.keys:
Elabora normalmente la richiesta r.
- # L'algoritmo termina.
- Altrimenti:
Se r è di sola lettura o di lettura+scrittura:
- Restituisce al richiedente l'eccezione NOT_FOUND.
- # L'algoritmo termina.
- Altrimenti:
# r è di scrittura:
Se la memoria destinata da n al servizio p non è esaurita (considerando my_records.size + retrieving_keys.size):
Elabora normalmente la richiesta r, aggiungendo un record in my_records[k].
- # L'algoritmo termina.
- Altrimenti:
Resetta il tempo di timer_non_exhaustive a TTL.
Rifiuta di elaborare r (perché out of memory).
- # L'algoritmo termina.
internal void ttl_db_retrieve_record(Object k)
Quando vuole recuperare il record per la chiave k, la classe PeerService del servizio p fa queste operazioni:
Prepara la richiesta r che dice "wait_then_send_record(k)".
Nota: chi riceve una richiesta "wait_then_send_record(k)" la tratta come una normale richiesta di sola lettura per la chiave k, a parte l'attesa.
- Try:
Esegue ret = contact_peer(p_id, peer_tuple(k), r, timeout_exec, True, out respondant).
Se ret = NotFoundError:
Se k ∉ removed_keys:
Aggiunge k a removed_keys.
Atomicamente (senza schedulare altre tasklet) invia un messaggio a tutti quelli che sono in attesa su retrieving_keys[k] e rimuove k da retrieving_keys.
- Altrimenti:
Scrive ret in my_records[k].
Rimuove k da removed_keys, se c'era.
Atomicamente (senza schedulare altre tasklet) invia un messaggio a tutti quelli che sono in attesa su retrieving_keys[k] e rimuove k da retrieving_keys.
Se riceve PeersNoParticipantsInNetworkError:
Se k ∉ removed_keys:
Aggiunge k a removed_keys.
Atomicamente (senza schedulare altre tasklet) invia un messaggio a tutti quelli che sono in attesa su retrieving_keys[k] e rimuove k da retrieving_keys.
Se riceve PeersRefuseExecutionError:
Se k ∉ removed_keys:
Aggiunge k a removed_keys.
Atomicamente (senza schedulare altre tasklet) invia un messaggio a tutti quelli che sono in attesa su retrieving_keys[k] e rimuove k da retrieving_keys.
void fixed_keys_db_begin(List<Object> K)
Quando il bootstrap è completo, la classe PeerService del servizio p fa queste operazioni:
Crea una lista vuota di chiavi k di cui il recupero è ancora da iniziare, not_started_keys.
Crea una mappa vuota di chiavi k in corso di recupero associate ad un canale di comunicazione tra tasklet, retrieving_keys.
Per ogni chiave k nell'insieme completo K:
Aggiunge k a not_started_keys.
Per ogni chiave k nell'insieme completo K:
Sia respondant il detentore corrente di k. Lo trova con una chiamata a contact_peer in cui non chiede nulla.
Se dist(hp(k), n) < dist(hp(k), respondant):
Mette un nuovo canale in retrieving_keys[k].
- In una nuova tasklet:
Avvia il recupero di k.
Rimuove k da not_started_keys.
- Opzionalmente, a seconda di quante sono le chiavi, attendi qualche istante per non gravare sulle prestazioni della rete.
- Altrimenti:
Rimuove k da not_started_keys.
IPeersResponse fixed_keys_db_got_request(IPeersRequest r, Object k) throws !PeersRefuseExecutionError
Quando riceve una richiesta r per la chiave k, la classe PeerService del servizio p fa queste operazioni:
- Se k ∈ not_started_keys:
Rifiuta di elaborare r (perché non esaustivo).
- Se k ∈ retrieving_keys.keys:
Se r è di sola lettura:
Rifiuta di elaborare r (perché non esaustivo).
- # L'algoritmo termina.
- Altrimenti:
# r è di scrittura:
Attende di ricevere una comunicazione dal canale retrieving_keys[k].
Elabora normalmente la richiesta r.
- # L'algoritmo termina.
- Altrimenti:
Elabora normalmente la richiesta r.
- # L'algoritmo termina.
- Se k ∈ not_started_keys:
internal void fixed_keys_db_retrieve_record(Object k)
Quando vuole recuperare il record per la chiave k, la classe PeerService del servizio p fa queste operazioni:
Prepara la richiesta r che dice "wait_then_send_record(k)".
Nota: chi riceve una richiesta "wait_then_send_record(k)" la tratta come una normale richiesta di sola lettura per la chiave k, a parte l'attesa.
- Try:
Esegue ret = contact_peer(p_id, peer_tuple(k), r, timeout_exec, True, out respondant).
Scrive ret in my_records[k].
Se riceve PeersNoParticipantsInNetworkError oppure PeersRefuseExecutionError:
Valorizza my_records[k] con un appropriato default.
Atomicamente (senza schedulare altre tasklet) invia un messaggio a tutti quelli che sono in attesa su retrieving_keys[k] e rimuove k da retrieving_keys.