1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
use std::net::SocketAddr;
use kudu_pb::master::{TabletLocationsPB, TabletLocationsPB_ReplicaPB as ReplicaPB};
use Partition;
use PartitionSchema;
use RaftRole;
use Result;
use Schema;
use TabletId;
use TabletServerId;
use dns;
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Tablet {
id: TabletId,
partition: Partition,
replicas: Vec<Replica>,
}
impl Tablet {
pub fn id(&self) -> TabletId {
self.id
}
pub fn partition(&self) -> &Partition {
&self.partition
}
pub fn replicas(&self) -> &[Replica] {
&self.replicas
}
#[doc(hidden)]
pub fn from_pb(primary_key_schema: &Schema,
partition_schema: PartitionSchema,
mut pb: TabletLocationsPB)
-> Result<Tablet> {
let id = try!(TabletId::parse_bytes(pb.get_tablet_id()));
let partition = try!(Partition::from_pb(primary_key_schema,
partition_schema,
pb.take_partition()));
let mut replicas = Vec::with_capacity(pb.get_replicas().len());
for replica in pb.take_replicas().into_iter() {
replicas.push(try!(Replica::from_pb(replica)));
}
Ok(Tablet {
id: id,
partition: partition,
replicas: replicas,
})
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Replica {
id: TabletServerId,
rpc_addrs: Vec<(String, u16)>,
resolved_rpc_addrs: Vec<SocketAddr>,
role: RaftRole,
is_local: bool,
}
impl Replica {
pub fn id(&self) -> &TabletServerId {
&self.id
}
pub fn rpc_addrs(&self) -> &[(String, u16)] {
&self.rpc_addrs
}
pub fn role(&self) -> RaftRole {
self.role
}
pub fn resolved_rpc_addrs(&self) -> &[SocketAddr] {
&self.resolved_rpc_addrs
}
#[doc(hidden)]
pub fn from_pb(mut pb: ReplicaPB) -> Result<Replica> {
let id = try!(TabletServerId::parse_bytes(pb.get_ts_info().get_permanent_uuid()));
let mut rpc_addrs = Vec::with_capacity(pb.get_ts_info().get_rpc_addresses().len());
for mut host_port in pb.mut_ts_info().take_rpc_addresses().into_iter() {
let port = host_port.get_port() as u16;
rpc_addrs.push((host_port.take_host(), port));
}
let resolved_rpc_addrs = dns::resolve_hostports(&rpc_addrs);
let role = RaftRole::from_pb(pb.get_role());
let is_local = resolved_rpc_addrs.iter().any(|addr| dns::is_local_addr(&addr.ip()));
Ok(Replica {
id: id,
rpc_addrs: rpc_addrs,
resolved_rpc_addrs: resolved_rpc_addrs,
role: role,
is_local: is_local,
})
}
}