1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// Copyright (c) 2018-2023, agnos.ai UK Ltd, all rights reserved.
//---------------------------------------------------------------

use {
    crate::{DataStore, DataStoreConnection, ServerConnection},
    ::r2d2::{ManageConnection, Pool},
    rdf_store_rs::RDFStoreError,
    std::sync::{
        atomic::{AtomicBool, Ordering},
        Arc,
    },
};

/// A pool-able connectable [`DataStore`]
pub struct ConnectableDataStore {
    data_store:                Arc<DataStore>,
    server_connection:         Arc<ServerConnection>,
    /// Indicates that we want to release all connections on return to the pool
    /// (used to shutdown gracefully)
    release_on_return_to_pool: AtomicBool,
}

impl ConnectableDataStore {
    /// release_on_return_to_pool: Mark connection as "destroy" when return back
    /// to pool
    pub fn new(
        data_store: &Arc<DataStore>,
        server_connection: &Arc<ServerConnection>,
        release_on_return_to_pool: bool,
    ) -> Self {
        Self {
            data_store:                data_store.clone(),
            server_connection:         server_connection.clone(),
            release_on_return_to_pool: AtomicBool::new(release_on_return_to_pool),
        }
    }

    /// Build an `r2d2::Pool` for the given `DataStore` and `ServerConnection`
    pub fn build_pool(self) -> Result<Pool<ConnectableDataStore>, RDFStoreError> {
        let cds = Pool::builder()
            .max_size(self.server_connection.get_number_of_threads()?)
            .build(self)?;
        Ok(cds)
    }
}

impl ManageConnection for ConnectableDataStore {
    type Connection = Arc<DataStoreConnection>;
    type Error = RDFStoreError;

    fn connect(&self) -> Result<Self::Connection, Self::Error> {
        self.server_connection
            .connect_to_data_store(&self.data_store)
    }

    fn is_valid(&self, _conn: &mut Self::Connection) -> Result<(), Self::Error> { Ok(()) }

    fn has_broken(&self, _conn: &mut Self::Connection) -> bool {
        self.release_on_return_to_pool.load(Ordering::Relaxed)
    }
}