Modulo PeerServices - Database con un numero esiguo e fisso di chiavi
Contents
Implementiamo un servizio che mantiene un database distribuito. Le chiavi sono in un dominio ben definito e non molto grande.
Ogni nodo partecipante può essere chiamato a mantenere in memoria un record per ognuna delle chiavi nel dominio: quindi non è previsto il caso di OUT_OF_MEMORY. Inoltre esiste, per ogni chiave, un record prefissato che ogni nodo partecipante può usare come record di default quando elabora una richiesta di lettura e nessuno aveva in precedenza salvato un valore: quindi non è previsto il caso di NOT_FOUND.
In questo servizio ipotiziamo che non ci siano vincoli per il client, cioè qualsiasi operazione è permessa (nei limiti del dominio delle chiavi e dei valori) senza alcun tipo di autorizzazione. Comunque risulterà facile implementare in un servizio analogo qualsiasi tipo di vincolo da parte del server; basterà codificare nella classe del valore di ritorno le eccezioni previste dal servizio.
Diamo per assunto che il lettore abbia già letto il documento Dettagli Tecnici.
Effetto di visibilità locale del dato
Opzionalmente, il servizio può stabilire di dare ad alcune chiavi un effetto di visibilità locale del dato. Con questo intendiamo che per una chiave k ∈ K si può identificare un livello l. Se un nodo n fa richiesta di memorizzare il valore v per la chiave k, tale valore sarà visibile ai soli nodi appartenenti allo stesso g-nodo di livello l a cui appartiene il nodo n. In altri g-nodi il valore associato alla chiave k sarà distinto.
Per ottenere questo sarà sufficiente che la classe del servizio implementi il metodo evaluate_hash_node della sua istanza di IDatabaseDescriptor in modo da restituire, quando il parametro è la chiave k, una tupla di soli l elementi. La stessa implementazione ovviamente dovrà essere usata dalla classe del client del servizio nel suo metodo perfect_tuple.
Operazione di sola lettura
Il nodo q vuole leggere il valore del record per una chiave k nel database distribuito.
Ci sono 2 possibili esiti finali a questa richiesta:
Il nodo q viene informato che al momento nella rete non c'è nessun partecipante al servizio. Chiamiamolo esito "NO_PARTICIPANTS".
Il nodo q viene informato che il record per la chiave k ha il valore v. Chiamiamolo esito "OK".
Operazione di scrittura
Il nodo q vuole modificare il valore del record per una chiave k nel database distribuito.
Ci sono 2 possibili esiti finali a questa richiesta:
Il nodo q viene informato che al momento nella rete non c'è nessun partecipante al servizio. Chiamiamolo esito "NO_PARTICIPANTS".
Il nodo q viene informato che il record per la chiave k è stato modificato. Chiamiamolo esito "OK".
Operazione di replica
Il nodo n0 vuole replicare un record k=w che esso ha appena scritto nella sua memoria. La richiesta giunge ad un nodo che viene dopo n0 nella ricerca dell'hash-node.
Ci sono 2 possibili esiti finali a questa richiesta:
Il nodo n0 viene informato che al momento nella rete non c'è nessun partecipante al servizio. Chiamiamolo esito "NO_PARTICIPANTS".
Il nodo n0 viene informato che il record k=w è stato replicato. Chiamiamolo esito "OK".
Fase iniziale
Sia n un nodo partecipante che entra in una rete esistente o che crea una nuova rete. Sia level_new_gnode il livello del nuovo g-nodo che il nodo n ha formato. I casi estremi possono essere l'intera rete, cioè levels, o 0.
Sia K l'insieme completo delle chiavi del servizio p, preferibilmente ordinato per avere per prime le chiavi che hanno una visibilità locale più ristretta, se il servizio lo prevede.
Immediatamente il nodo n avvia un esame su tutte le possibili chiavi k ∈ K. Sia l il livello del g-nodo in cui la visibilità del dato per la chiave k è circoscritta, oppure sia l = levels.
Se level_new_gnode ≥ l, allora il nodo n inizializza il record. Cioè mette nella sua memoria il record di default associato a k. Inoltre mette a completato lo stato dell'operazione iniziale di recupero del record per la chiave k. Di seguito analiziamo cosa fa il nodo n se, invece, non deve inizializzare il record per la chiave k.
Il nodo n avvia un procedimento di recupero del record associato a k, con attesa del tempo critico di coerenza δ.
Il procedimento di recupero consiste nell'inviare la richiesta al nodo che in precedenza era detentore del record per la chiave k, chiamiamolo current(k).
Tali richieste vanno intervallate tra di loro per non appesantire la rete, ma non troppo tempo: infatti non ci sono altri punti in cui le operazioni di recupero vengono avviate e il nodo corrente potrebbe essere chiamato a servire un record. Quindi il client dovrebbe attendere a lungo. Il tempo massimo per l'espletamento di queste operazioni iniziali dovrebbe essere sufficientemente contenuto per il fatto che il numero delle chiavi è esiguo (come prevede questa tipologia di servizi) e che le chiavi esaminate per prime sono quelle che hanno una visibilità locale più ristretta. Diciamo dunque che dopo aver avviato (in una nuova tasklet) l'operazione di recupero di un record (ci riferiamo quindi ai record che non vengono inizializzati da n) si attendono 200 millisecondi.
Come detto, tale procedimento prevede che se il nodo n durante questo tempo riceve richieste per la chiave k che sono di scrittura, allora non le rifiuta subito, ma le mette in attesa. Siccome ora le richieste di scrittura per la chiave k passano prima per il nodo n che le fa attendere, di sicuro non giungeranno al current(k).
E' possibile che queste richieste avvengano in modo circoscritto ad un g-nodo di appartenenza di n, se il servizio come detto prima decide di dare ad alcune chiavi un effetto di visibilità locale del dato.
E' possibile che la richiesta per la chiave k restituisca una eccezione PeersNoParticipantsInNetworkError. Ad esempio se il servizio è opzionale e nessuno dei nodi già esistenti (eventualmente nel g-nodo in cui la ricerca è stata circoscritta) vi partecipa. In questo caso il nodo corrente, il quale partecipa al servizio, si può ritenere esaustivo per k. Quindi, a fronte di questa eccezione, il nodo associa alla chiave k nella sua memoria il record di default per tale chiave.
Esaustività del nodo servente
Esaminiamo ora cosa avviene per le richieste, di qualsiasi tipo, che il nodo n riceve da altri.
Quando il nodo n riceve una richiesta r per una chiave k, il suo comportamento dipende da due fattori:
Lo stato dell'operazione iniziale di recupero del record per la chiave k. Può essere completato oppure no.
Il tipo della richiesta r. Può essere di sola lettura oppure no.
Se la richiesta è di sola lettura e il recupero non è ancora completato, allora la rifiuta immediatamente come non esaustivo, di modo che sarà servita dal precedente detentore.
A causa di questo comportamento, un client di questo servizio deve sapere che può ricevere una eccezione PeersDatabaseError. Non dovrà interpretarla con un "record non trovato" ma dovrà riprovare a fare la richiesta dopo un po' di tempo: come detto prima gli unici esiti per una richiesta di lettura sono "NO_PARTICIPANTS" e "OK".
Se la richiesta non è di sola lettura e il recupero non è ancora completato, allora la mette in attesa finché non ha completato o, al massimo, fino quasi ad esaurire il tempo previsto per l'esecuzione della richiesta (timeout_exec) e poi da istruzioni al client di riavviare da capo il calcolo distribuito di Ht, rilanciando una eccezione PeersRedoFromStartError. Questo perché comunque potrebbe non essere più lui il miglior candidato.
Se il recupero del record è stato completato, qualunque sia il tipo della richiesta, la serve immediatamente.
Per gestire questo aspetto il nodo n fa uso di alcune strutture dati memorizzate nella classe DatabaseHandler:
Elenco di chiavi List<Object> not_completed_keys.
Se una chiave k è nell'elenco allora il nodo sa che il recupero del relativo record non è ancora stato completato.
Algoritmi
La classe del servizio deve fornire, oltre alle operazioni previste in tutti i servizi che implementano un database distribuito, queste ulteriori operazioni:
List<Object> get_full_key_domain(): Restituire la lista di tutte le possibili chiavi nel dominio. Preferibilmente tale elenco deve essere ordinato per avere per prime le chiavi che eventualmente prevedono una visibilità locale.
Object get_default_record_for_key(Object k): Restituire una istanza (da mettere in memoria) come valore di default per la chiave k, avendo come requisito (cioè il chiamante deve garantirlo) che tale chiave è del dominio del servizio, come indicato dal metodo precedente. Questa funzione deve garantire di essere atomica, cioè di non schedulare altre tasklet.
Il modulo fornisce l'interfaccia IFixedKeysDatabaseDescriptor. Essa estende l'interfaccia IDatabaseDescriptor ed espone anche i metodi sopra descritti: get_full_key_domain e get_default_record_for_key.
La classe che implementa il servizio nel suo costruttore crea una istanza di IFixedKeysDatabaseDescriptor che userà in tutte le chiamate a questi algoritmi, che sono metodi di PeersManager. Subito richiamerà il metodo fixed_keys_db_on_startup. In seguito per ogni richiesta che riceve richiamerà il metodo fixed_keys_db_on_request.
Algoritmo all'avvio:
void fixed_keys_db_on_startup(IFixedKeysDatabaseDescriptor fkdd, int p_id, int level_new_gnode)
assert: services.has_key(p_id).
- In una nuova tasklet:
srv = services[p_id].
fkdd.dh = new DatabaseHandler().
fkdd.dh.p_id = p_id.
fkdd.dh.ready = False.
Se srv.is_optional:
Mentre not participant_maps_retrieved:
Se participant_maps_failed:
- Termina l'algoritmo.
- Aspetta 1 secondo.
fkdd.dh.not_completed_keys = new ArrayList<Object>(fkdd.key_equal_data).
fkdd.dh.retrieving_keys = new HashMap<Object,INtkdChannel>(fkdd.key_hash_data, fkdd.key_equal_data).
List<Object> k_set = fkdd.get_full_key_domain().
Per ogni chiave Object k in k_set:
Mette k in fkdd.dh.not_completed_keys.
fkdd.dh.ready = True.
wait_before_network_activity = False.
Per ogni chiave Object k in k_set:
h_p_k = fkdd.evaluate_hash_node(k).
l = h_p_k.size.
Se level_new_gnode ≥ l:
Esegui fkdd.set_record_for_key(k, fkdd.get_default_record_for_key(k)).
Rimuove k da fkdd.dh.not_completed_keys.
- Altrimenti:
Se wait_before_network_activity:
- Attendi 200 millisecondi.
Esegue fixed_keys_db_start_retrieve(fkdd, k). Cioè avvia in una tasklet il recupero del record per la chiave k.
wait_before_network_activity = True.
Algoritmo alla ricezione della richiesta:
IPeersResponse fixed_keys_db_on_request(IFixedKeysDatabaseDescriptor fkdd, IPeersRequest r, int common_lvl) throws PeersRefuseExecutionError, PeersRedoFromStartError
Gli argomenti del metodo fixed_keys_db_on_request sono:
fkdd: istanza di una classe che implementa IFixedKeysDatabaseDescriptor sopra descritta.
r: la richiesta.
common_lvl: il livello del minimo comune g-nodo con il richiedente, da 0 a levels compresi.
Se fkdd.dh = null o not fkdd.dh.ready:
Rilancia PeersRefuseExecutionError.NOT_EXHAUSTIVE.
- L'algoritmo termina.
Se fkdd.is_read_only_request(r):
Object k = fkdd.get_key_from_request(r).
Se k non è in fkdd.dh.not_completed_keys:
assert: fkdd.my_records_contains(k).
Elabora la richiesta di lettura di k. Ottiene la risposta da restituire. res = fkdd.execute(r).
- L'algoritmo termina.
- Altrimenti:
Rilancia PeersRefuseExecutionError.NOT_EXHAUSTIVE.
- L'algoritmo termina.
Se r is RequestWaitThenSendRecord:
Object k = r.k.
Se k è in fkdd.dh.not_completed_keys:
Rilancia PeersRefuseExecutionError.NOT_EXHAUSTIVE.
- L'algoritmo termina.
assert: fkdd.my_records_contains(k).
Calcola δ il tempo critico di coerenza, basandosi sul numero approssimato di nodi nel minimo comune g-nodo tra il nodo corrente e quello del richiedente, cioè il proprio g-nodo di livello common_lvl.
Attende δ o al massimo RequestWaitThenSendRecord.timeout_exec - 1000.
assert: fkdd.my_records_contains(k).
ret = new RequestWaitThenSendRecordResponse().
ret.record = fkdd.get_record_for_key(k).
Return ret.
- L'algoritmo termina.
Se fkdd.is_update_request(r):
Object k = fkdd.get_key_from_request(r).
Se k non è in fkdd.dh.not_completed_keys:
assert: fkdd.my_records_contains(k).
Elabora la richiesta di modifica per k. Ottiene la risposta da restituire. res = fkdd.execute(r).
- L'algoritmo termina.
- Altrimenti:
Mentre la chiave k non è ancora nell'elenco fkdd.dh.retrieving_keys:
- Aspetta 200 millisecondi.
Recupera il canale di comunicazione ch = fkdd.dh.retrieving_keys[k].
Aspetta sul canale di comunicazione ch fino a un massimo di fkdd.get_timeout_exec(r) - 1000.
Rilancia PeersRedoFromStartError.
- L'algoritmo termina.
Se fkdd.is_replica_value_request(r):
Object k = fkdd.get_key_from_request(r).
Elabora la richiesta di replica di valorizzazione per k. Ottiene la risposta da restituire. res = fkdd.execute(r).
Restituisce res. L'algoritmo termina.
- Nessuno dei casi precedenti.
Elabora la richiesta r. Ottiene la risposta da restituire. res = fkdd.execute(r).
Risponde con res.
- L'algoritmo termina.
Algoritmo di avvio del recupero (in una nuova tasklet) del record per la chiave k:
internal void fixed_keys_db_start_retrieve(IFixedKeysDatabaseDescriptor fkdd, Object k)
Crea un canale di comunicazione ch.
Mette la chiave k nell'elenco fkdd.dh.retrieving_keys, associandogli il canale ch.
- In una nuova tasklet:
Avvia il procedimento di recupero del record per la chiave k.
Si tratta di una richiesta di sola lettura (con attesa del tempo di coerenza) in cui il nodo n è il client e si auto-esclude come servente.
- Gli esiti possono essere:
Una istanza res di RequestWaitThenSendRecordResponse.
- Significa che abbiamo recuperato il record.
L'eccezione PeersNoParticipantsInNetworkError o una istanza di una classe inattesa.
- Significa che va usato come record il valore di default.
Object? record = null.
IPeersRequest r = new RequestWaitThenSendRecord().
timeout_exec = RequestWaitThenSendRecord.timeout_exec.
- Mentre True:
- Try:
PeerTupleNode respondant = null.
h_p_k = fkdd.evaluate_hash_node(k).
Esegue IPeersResponse res = contact_peer(fkdd.dh.p_id, h_p_k, r, timeout_exec, True, out respondant).
Se res è un RequestWaitThenSendRecordResponse:
record = res.record.
- Esce dal ciclo.
Se riceve PeersNoParticipantsInNetworkError:
- Esce dal ciclo.
Se riceve PeersDatabaseError:
- Aspetta 200 millisecondi. Riproverà alla prossima iterazione del ciclo.
- Try:
Se record ≠ null e fkdd.is_valid_record(k, record):
Esegui fkdd.set_record_for_key(k, record).
- Altrimenti:
Esegui fkdd.set_record_for_key(k, fkdd.get_default_record_for_key(k)).
INtkdChannel temp_ch = fkdd.dh.retrieving_keys[k].
Esegui fkdd.dh.retrieving_keys.unset(k).
Invia un messaggio asincrono (senza schedulare altre tasklet) a tutti quelli che sono in attesa su temp_ch.
Rimuove k da fkdd.dh.not_completed_keys.