20625
Comment:
|
20894
|
Deletions are marked like this. | Additions are marked like this. |
Line 55: | Line 55: |
=== void publish_my_participation(p_id) === | === publish_my_participation === '''void publish_my_participation(p_id)''' |
Line 77: | Line 78: |
=== void set_participant(int p_id, PeerTupleGNode gn) === | === set_participant === '''void set_participant(int p_id, PeerTupleGNode gn)''' |
Line 112: | Line 114: |
=== bool begin_replica(q, p_id, x̄, r, timeout_exec, out IPeersResponse? resp, out IPeersContinuation cont) === | === begin_replica === '''bool begin_replica(q, p_id, x̄, r, timeout_exec, out IPeersResponse? resp, out IPeersContinuation cont)''' |
Line 126: | Line 129: |
=== bool next_replica(IPeersContinuation cont, out IPeersResponse? resp) === | === next_replica === '''bool next_replica(IPeersContinuation cont, out IPeersResponse? resp)''' |
Line 139: | Line 143: |
=== void ttl_db_begin() === | === ttl_db_begin === '''void ttl_db_begin()''' |
Line 193: | Line 198: |
=== IPeersResponse ttl_db_got_request(IPeersRequest r, Object k) throws !PeersRefuseExecutionError === | === ttl_db_got_request === '''IPeersResponse ttl_db_got_request(IPeersRequest r, Object k) throws !PeersRefuseExecutionError''' |
Line 266: | Line 272: |
=== internal void ttl_db_retrieve_record(Object k) === | === ttl_db_retrieve_record === '''internal void ttl_db_retrieve_record(Object k)''' |
Line 289: | Line 296: |
=== void fixed_keys_db_begin(List<Object> K) === | === fixed_keys_db_begin === '''void fixed_keys_db_begin(List<Object> K)''' |
Line 306: | Line 314: |
=== IPeersResponse fixed_keys_db_got_request(IPeersRequest r, Object k) throws !PeersRefuseExecutionError === | === fixed_keys_db_got_request === '''IPeersResponse fixed_keys_db_got_request(IPeersRequest r, Object k) throws !PeersRefuseExecutionError''' |
Line 323: | Line 332: |
=== internal void fixed_keys_db_retrieve_record(Object k) === | === fixed_keys_db_retrieve_record === '''internal void fixed_keys_db_retrieve_record(Object k)''' |
Modulo PeerServices - Appunti - Algoritmi 2 / 2
Contents
Algoritmo di rilevamento di non partecipazione
check_non_participation
bool check_non_participation(p_id, lvl, _pos)
- In questo algoritmo non ci interessa sapere se un g-nodo partecipa, ma solo se è possibile dire con certezza che esso non partecipa. In caso di incertezza l'algoritmo restituisce False.
Produci x̄ = la tupla x̄0·x̄1·...·x̄lvl-1 dove x̄i = 0 per ogni i. La tupla identifica un indirizzo a caso all'interno del g-nodo g = (lvl, _pos). Se lvl = 0 allora x̄ è null.
Produci n = make_tuple_node(new HCoord(0, pos[0]), lvl+1) , cioè la tupla n0·n1·...·nlvl. La tupla che identifica il nodo corrente nel g-nodo di livello lvl+1 in cui il messaggio si muoverà.
m’ = new PeerMessageForwarder.
m’.n = n.
m’.x̄ = x̄.
m’.lvl = lvl.
m’.pos = _pos.
m’.p_id = p_id.
m’.msg_id = un identificativo generato a caso per questo messaggio.
Calcola timeout_instradamento = f ( map_paths.i_peers_get_nodes_in_my_group(lvl + 1) ).
Prepara waiting_answer = new WaitingAnswer(null, (lvl,_pos) come PeerTupleGNode con top = lvl+1). Il fatto che l'istanza di IPeersRequest è a null fa in modo che i metodi remoti che ricevono le notifiche si comportano in modo adeguato. Sostanzialmente dovrebbe cambiare solo il fatto che quando si riceve la segnalazione di get_request si risponde sempre con l'eccezione PeersUnknownMessageError, ache se si è potuto recuperare l'istanza di WaitingAnswer. Sull'istanza di WaitingAnswer viene poi valorizzato il membro response con qualcosa diverso da null solo per indicare che il g-nodo partecipa.
waiting_answer_map[m’.msg_id] = waiting_answer.
IPeersManagerStub gwstub
IPeersManagerStub? failed = null
- While True:
- Try:
- Calcola gwstub = map_paths.i_peers_gateway(lvl, _pos, null, failed)
Se riceve l'eccezione PeersNonexistentDestinationError:
- Restituisci True. Rimuovi waiting_answer_map[m’.msg_id]. Termina algoritmo.
- Try:
- Esegue gwstub.forward_peer_message(m’).
Se riceve StubError o DeserializeError:
- failed = gwstub.
- Continua con la prossima iterazione del ciclo.
- Esci dal ciclo.
- Try:
- Try:
- Sta in attesa su waiting_answer.ch per max timeout_instradamento.
- Se waiting_answer.exclude_gnode ≠ null:
- Restituisce False. Rimuovi waiting_answer_map[m’.msg_id]. Termina algoritmo.
- Altrimenti-Se waiting_answer.non_participant_gnode ≠ null:
- # significa che abbiamo ricevuto notizia di un gnodo non partecipante.
waiting_answer.non_participant_gnode è un PeerTupleGNode che rappresenta un g-nodo h dentro il mio g-nodo di livello top.
- Se è visibile nella mia mappa, cioè se (lvl,top) non partecipa:
- Restituisci True. Rimuovi waiting_answer_map[m’.msg_id]. Termina algoritmo.
- Altrimenti:
- Restituisce False. Rimuovi waiting_answer_map[m’.msg_id]. Termina algoritmo.
- Altrimenti-Se waiting_answer.response ≠ null:
- # significa che abbiamo ricevuto il contatto e che lvl,_pos partecipa.
- Restituisce False. Rimuovi waiting_answer_map[m’.msg_id]. Termina algoritmo.
- Altrimenti:
- # significa che abbiamo ricevuto un nuovo valore in waiting_answer.min_target.
- Restituisce False. Rimuovi waiting_answer_map[m’.msg_id]. Termina algoritmo.
Se riceve l'eccezione TimeoutError:
- # dobbiamo trattare waiting_answer.min_target come da escludere.
- Restituisce False. Rimuovi waiting_answer_map[m’.msg_id]. Termina algoritmo.
Algoritmo di divulgazione della partecipazione
publish_my_participation
void publish_my_participation(p_id)
gn = make_tuple_gnode(new HCoord(0, pos[0]), levels). La tupla n0·n1·...·nlevels-1, che identifica il nodo corrente nella rete.
tempo_attesa = 300 secondi.
iterazioni = 5.
- While True (per sempre):
Se iterazioni > 0:
- Decrementa iterazioni di 1.
- Altrimenti:
- tempo_attesa = 1 giorno + random(1..24*60*60) secondi.
Prepara un IPeersMissingArcHandler missing_handler che in caso di invocazione esegua:
- Calcola tcp_stub = neighbors_factory.i_peers_get_tcp(missing_arc).
- Try:
- tcp_stub.set_participant(p_id, gn).
Se riceve StubError o DeserializeError:
- Ignora.
- Calcola br_stub = neighbors_factory.i_peers_get_broadcast(missing_handler).
- Try:
- br_stub.set_participant(p_id, gn).
Se riceve StubError o DeserializeError:
- Ignora.
- Aspetta tempo_attesa.
set_participant
void set_participant(int p_id, PeerTupleGNode gn)
E' già stata istanziata lista_recenti un ArrayList di HCoord.
- Se services.has_key(p_id) AND NOT services[p_id].p_is_optional:
- Ignora il messaggio. Algoritmo termina.
int case, HCoord ret.
- Calcola convert_tuple_gnode(gn, out case, out ret).
- Se case = 1:
- Cioè gn rappresenta un mio g-nodo.
- Ignora il messaggio. Algoritmo termina.
- Altrimenti:
- Cioè gn rappresenta un g-nodo a cui io non appartengo, ed ho già calcolato in ret il g-nodo visibile nella mia topologia in cui gn si trova.
- Se ret ∈ lista_recenti:
- Ignora il messaggio. Algoritmo termina.
- Altrimenti:
- lista_recenti.add(ret).
- Se NOT participant_maps.has_key(p_id):
participant_maps[p_id] = new ParticipantMap().
- participant_maps[p_id].participant_list.add(ret).
ret_gn = make_tuple_gnode(ret, levels)
Prepara un IPeersMissingArcHandler missing_handler che in caso di invocazione esegua:
- Calcola tcp_stub = neighbors_factory.i_peers_get_tcp(missing_arc).
- Try:
- tcp_stub.set_participant(p_id, ret_gn).
Se riceve StubError o DeserializeError:
- Ignora.
- Calcola br_stub = neighbors_factory.i_peers_get_broadcast(missing_handler).
- Try:
- br_stub.set_participant(p_id, ret_gn).
Se riceve StubError o DeserializeError:
- Ignora.
- Svolgi quanto segue in una nuova tasklet portando dietro ret:
- Aspetta 60 secondi.
- lista_recenti.remove(ret).
Algoritmo di mantenimento di un database distribuito
begin_replica
bool begin_replica(q, p_id, x̄, r, timeout_exec, out IPeersResponse? resp, out IPeersContinuation cont)
- Gli argomenti sono:
int q: il numero delle repliche richieste,
int p_id,
PeerTupleNode x̄: la tupla dell'hash della chiave del record, cioè hp ( k ),
IPeersRequest r: la richiesta di replicare la coppia k,val ,
int timeout_exec,
resp viene valorizzato con la risposta o null;
cont è un oggetto di cui all'esterno si sa solo che implementa l'interfaccia vuota IPeersContinuation.
lista_repliche = new List di PeerTupleNode.
exclude_tuple_list = new PeerTupleGNodeContainer(x̄.tuple.size).
- cont = {q, p_id, x̄, r, timeout_exec, lista_repliche, exclude_tuple_list}.
- Return next_replica(cont, out resp).
next_replica
bool next_replica(IPeersContinuation cont, out IPeersResponse? resp)
- resp = null.
- Se cont.lista_repliche.size ≥ cont.q:
- Return False.
PeerTupleNode respondant;
- ret = contact_peer(cont.p_id, cont.x̄, cont.r, cont.timeout_exec, True, out respondant, cont.exclude_tuple_list).
Se si riceve l'eccezione PeersNoParticipantsInNetworkError:
- Return False.
- resp = ret.
- aggiungi respondant a cont.lista_repliche.
- aggiungi respondant a cont.exclude_tuple_list.
Return cont.lista_repliche.size < cont.q.
ttl_db_begin
void ttl_db_begin()
Viene avviata in una tasklet dalla classe del servizio, che deriva PeerService.
- Se si tratta di un servizio opzionale, viene chiamata solo dopo che sono state reperite con successo le mappe dei partecipanti ai servizi opzionali.
Riceve come primo argomento una istanza di una classe che implementa l'interfaccia ITemporalDatabaseHandler. Tale classe implementa le operazioni che servono a gestire un database distribuito in cui i record hanno una scadenza, o TTL. Un presupposto è che sia le chiavi sia i valori dei record siano istanze di classi derivate da Object e siano serializzabili.
- L'interfaccia ITemporalDatabaseHandler espone questi metodi:
int msec_ttl: una proprietà che dice il numero di millisecondi prima che un record scada.
bool is_valid_key(Object k): una funzione che dice se k è una chiave valida per il servizio.
bool key_equal_data(Object k1, Object k2) e uint key_hash_data(Object k): metodi le cui firme sono adatte per i delegati Gee.EqualDataFunc<Object> e Gee.HashDataFunc<Object>, per costruire una HashMap o una lista "con funzionalità di ricerca" di chiavi del servizio.
int max_records: una proprietà che dice il numero massimo di record che il nodo può memorizzare per questo servizio.
int my_records_size(): una funzione per sapere il numero di records attualmente memorizzati dal nodo.
Gli argomenti del metodo ttl_db_begin sono:
tdh: istanza di ITemporalDatabaseHandler sopra descritta.
tuple_n: una tupla con tutte le posizioni del nodo da 0 a levels - 1.
timer_non_exhaustive = un timer che scade dopo tdh.msec_ttl millisecondi.
removed_keys = new ArrayList<Object>(tdh.key_equal_data): una lista vuota di chiavi cancellate su richiesta.
retrieving_keys = new HashMap<Object,INtkdChannel>(tdh.key_equal_data, tdh.key_hash_data): una mappa vuota di chiavi in corso di recupero associate ad un canale di comunicazione tra tasklet.
IPeersRequest r = new RequestSendKeys(): la richiesta che dice di inviare tutte le chiavi memorizzate.
Nota: spostare sulla descrizione di ttl_db_got_request. Chi riceve tale richiesta risponde subito con tutte le chiavi k ∈ my_records.keys.
timeout_exec = tempo massimo di esecuzione per r. Una costante.
- Try:
respondant = null.
Esegue IPeersResponse ret = contact_peer(p_id, tuple_n, r, timeout_exec, True, out respondant).
Il valore restituito dovrebbe essere un RequestSendKeysResponse, cioè una lista di Object. Altrimenti la risposta viene ignorata.
Se ret è una istanza di RequestSendKeysResponse:
Per ogni chiave k in ret:
Se is_valid_key(k):
Se k ∉ my_records.keys e k ∉ removed_keys e k ∉ retrieving_keys.keys:
# Non sa nulla di k.
Se dist(hp(k), n) < dist(hp(k), respondant):
Avvia il recupero di k.
- Attendi qualche istante per non gravare sulle prestazioni della rete.
Calcola l_n0 = livello del massimo distinto g-nodo di respondant.
Calcola p_n0 = posizione del massimo distinto g-nodo di respondant.
tuple_n = una tupla con tutte le posizioni del nodo da 0 a l_n0 inclusi.
Prepara exclude_tuple_list = [] una lista di istanze di tuple globali nel g-nodo di ricerca di livello l_n0 + 1.
Per i da 0 a gsize[l_n0] - 1: Se i ≠ p_n0: Metti in exclude_tuple_list il g-nodo (l_n0, i).
Metti in exclude_tuple_list il nodo respondant.
While my_records_size() + retrieving_keys.size < max_records:
# la memoria destinata da n al servizio p non è esaurita.
- Attendi qualche istante per non gravare sulle prestazioni della rete.
Esegue ret = contact_peer(p_id, tuple_n, r, timeout_exec, True, out respondant, exclude_tuple_list).
Il valore restituito dovrebbe essere un RequestSendKeysResponse, cioè una lista di Object. Altrimenti la risposta viene ignorata.
Se ret è una istanza di RequestSendKeysResponse:
Per ogni chiave k in ret:
Se is_valid_key(k):
Se k ∉ my_records.keys e k ∉ removed_keys e k ∉ retrieving_keys.keys:
# Non sa nulla di k.
Se dist(hp(k), n) < dist(hp(k), respondant):
Avvia il recupero di k.
- Attendi qualche istante per non gravare sulle prestazioni della rete.
Metti in exclude_tuple_list il nodo respondant.
Se riceve PeersNoParticipantsInNetworkError:
- # L'algoritmo termina.
ttl_db_got_request
IPeersResponse ttl_db_got_request(IPeersRequest r, Object k) throws PeersRefuseExecutionError
Quando riceve una richiesta r per la chiave k, la classe PeerService del servizio p fa queste operazioni:
Se timer_non_exhaustive non è espirato:
Se k ∈ my_records.keys:
Se r è di scrittura (o di lettura+scrittura) e prevede la cancellazione di k:
Elabora normalmente la richiesta r, rimuovendo l'elemento da my_records.
Mette k in removed_keys.
- # L'algoritmo termina.
- Altrimenti:
Elabora normalmente la richiesta r, non sono previste variazioni in my_records.keys.
- # L'algoritmo termina.
Altrimenti-Se k ∈ removed_keys:
Se r è di lettura o lettura+scrittura:
- Restituisce al richiedente l'eccezione NOT_FOUND.
- # L'algoritmo termina.
- Altrimenti:
# r è di scrittura:
Rimuove k da removed_keys.
Se la memoria destinata da n al servizio p non è esaurita (considerando my_records.size + retrieving_keys.size):
Elabora normalmente la richiesta r, aggiungendo un record in my_records[k].
- # L'algoritmo termina.
- Altrimenti:
Resetta il tempo di timer_non_exhaustive a TTL.
Rifiuta di elaborare r (perché out of memory).
- # L'algoritmo termina.
Altrimenti-Se k ∈ retrieving_keys.keys:
Se r è di sola lettura:
Rifiuta di elaborare r (perché non esaustivo).
- # L'algoritmo termina.
- Altrimenti:
# r è di scrittura o lettura+scrittura:
Attende di ricevere una comunicazione dal canale retrieving_keys[k].
Se r prevede una lettura prima della scrittura:
Se k ∉ my_records.keys:
- # Il nodo non ha il record ancora.
Mette k in removed_keys.
- Restituisce al richiedente l'eccezione NOT_FOUND.
- # L'algoritmo termina.
- Altrimenti:
Elabora normalmente la richiesta r.
- # L'algoritmo termina.
- Altrimenti:
Elabora normalmente la richiesta r.
- # L'algoritmo termina.
- Altrimenti:
# Non sa nulla di k.
Se la memoria destinata da n al servizio p non è esaurita (considerando my_records.size + retrieving_keys.size):
Mette un nuovo canale in retrieving_keys[k].
- In una nuova tasklet:
Avvia il recupero di k.
Resetta il tempo di timer_non_exhaustive a TTL.
Rifiuta di elaborare r (perché non esaustivo).
- # L'algoritmo termina.
- Altrimenti:
# È esaustivo.
Svuota la lista removed_keys.
Se k ∈ my_records.keys:
Elabora normalmente la richiesta r.
- # L'algoritmo termina.
- Altrimenti:
Se r è di sola lettura o di lettura+scrittura:
- Restituisce al richiedente l'eccezione NOT_FOUND.
- # L'algoritmo termina.
- Altrimenti:
# r è di scrittura:
Se la memoria destinata da n al servizio p non è esaurita (considerando my_records.size + retrieving_keys.size):
Elabora normalmente la richiesta r, aggiungendo un record in my_records[k].
- # L'algoritmo termina.
- Altrimenti:
Resetta il tempo di timer_non_exhaustive a TTL.
Rifiuta di elaborare r (perché out of memory).
- # L'algoritmo termina.
ttl_db_retrieve_record
internal void ttl_db_retrieve_record(Object k)
Quando vuole recuperare il record per la chiave k, la classe PeerService del servizio p fa queste operazioni:
Prepara la richiesta r che dice "wait_then_send_record(k)".
Nota: chi riceve una richiesta "wait_then_send_record(k)" la tratta come una normale richiesta di sola lettura per la chiave k, a parte l'attesa.
- Try:
Esegue ret = contact_peer(p_id, peer_tuple(k), r, timeout_exec, True, out respondant).
Se ret = NotFoundError:
Se k ∉ removed_keys:
Aggiunge k a removed_keys.
Atomicamente (senza schedulare altre tasklet) invia un messaggio a tutti quelli che sono in attesa su retrieving_keys[k] e rimuove k da retrieving_keys.
- Altrimenti:
Scrive ret in my_records[k].
Rimuove k da removed_keys, se c'era.
Atomicamente (senza schedulare altre tasklet) invia un messaggio a tutti quelli che sono in attesa su retrieving_keys[k] e rimuove k da retrieving_keys.
Se riceve PeersNoParticipantsInNetworkError:
Se k ∉ removed_keys:
Aggiunge k a removed_keys.
Atomicamente (senza schedulare altre tasklet) invia un messaggio a tutti quelli che sono in attesa su retrieving_keys[k] e rimuove k da retrieving_keys.
Se riceve PeersRefuseExecutionError:
Se k ∉ removed_keys:
Aggiunge k a removed_keys.
Atomicamente (senza schedulare altre tasklet) invia un messaggio a tutti quelli che sono in attesa su retrieving_keys[k] e rimuove k da retrieving_keys.
fixed_keys_db_begin
void fixed_keys_db_begin(List<Object> K)
Quando il bootstrap è completo, la classe PeerService del servizio p fa queste operazioni:
Crea una lista vuota di chiavi k di cui il recupero è ancora da iniziare, not_started_keys.
Crea una mappa vuota di chiavi k in corso di recupero associate ad un canale di comunicazione tra tasklet, retrieving_keys.
Per ogni chiave k nell'insieme completo K:
Aggiunge k a not_started_keys.
Per ogni chiave k nell'insieme completo K:
Sia respondant il detentore corrente di k. Lo trova con una chiamata a contact_peer in cui non chiede nulla.
Se dist(hp(k), n) < dist(hp(k), respondant):
Mette un nuovo canale in retrieving_keys[k].
- In una nuova tasklet:
Avvia il recupero di k.
Rimuove k da not_started_keys.
- Opzionalmente, a seconda di quante sono le chiavi, attendi qualche istante per non gravare sulle prestazioni della rete.
- Altrimenti:
Rimuove k da not_started_keys.
fixed_keys_db_got_request
IPeersResponse fixed_keys_db_got_request(IPeersRequest r, Object k) throws PeersRefuseExecutionError
Quando riceve una richiesta r per la chiave k, la classe PeerService del servizio p fa queste operazioni:
- Se k ∈ not_started_keys:
Rifiuta di elaborare r (perché non esaustivo).
- Se k ∈ retrieving_keys.keys:
Se r è di sola lettura:
Rifiuta di elaborare r (perché non esaustivo).
- # L'algoritmo termina.
- Altrimenti:
# r è di scrittura:
Attende di ricevere una comunicazione dal canale retrieving_keys[k].
Elabora normalmente la richiesta r.
- # L'algoritmo termina.
- Altrimenti:
Elabora normalmente la richiesta r.
- # L'algoritmo termina.
- Se k ∈ not_started_keys:
fixed_keys_db_retrieve_record
internal void fixed_keys_db_retrieve_record(Object k)
Quando vuole recuperare il record per la chiave k, la classe PeerService del servizio p fa queste operazioni:
Prepara la richiesta r che dice "wait_then_send_record(k)".
Nota: chi riceve una richiesta "wait_then_send_record(k)" la tratta come una normale richiesta di sola lettura per la chiave k, a parte l'attesa.
- Try:
Esegue ret = contact_peer(p_id, peer_tuple(k), r, timeout_exec, True, out respondant).
Scrive ret in my_records[k].
Se riceve PeersNoParticipantsInNetworkError oppure PeersRefuseExecutionError:
Valorizza my_records[k] con un appropriato default.
Atomicamente (senza schedulare altre tasklet) invia un messaggio a tutti quelli che sono in attesa su retrieving_keys[k] e rimuove k da retrieving_keys.