» /rd/lib.framework/KRDB/Storage/DT.php

<?php

/*
    Distributed mysql table storage
    See homebase\db\Dt

    key is scalar or hash

    Config:
        krdb.$namespace.mysql-fields - space delimited field list (or array)
                                     - this fields will be saved as mysql fields

        krdb.$namespace.shard-key    - (optional) when defined $key is treated as hash, $hash[$shard_key] used as a dt shard key
                                     - (default) $key - is scalar and shard-key is "ID"

  // TODO - cache successfully loaded records, then use update instead of upsert (avoid check record lookup)

*/

class KRDB_Storage_DT implements iKRDB_Storage {


    
# namespace to DT mapping
    
protected function M($namespace) { # \hb\db\DistributedTable
        
return \hb\db\DistributedTable::i("dt.$namespace");
    }

    function 
shard_key($namespace$key) { # shard-key value
        
if (! is_array($key))
            return 
$key;
        
#static $shard_key;
        #if (! $shard_key)
        
$shard_key C("distributed-table.dt.$namespace.shard-key");
        
$sk $key[$shard_key];
        if (! 
$sk)
            
\Log::alert("no shard key in key:".json_encode($key));
        return 
$sk;
    }

    
/**
     * For overloading
     * This version of KRDB_Storage_DT supports DB_Parallel calls.
     * @reimp
     * @param $namespace
     * @param $key
     * @param string $fields
     * @param DB_Parallel|null $DB_Parallel
     * @return array
     * @throws DB_Parallel_Exception
     */
    
function _load($namespace$key$fields="*"DB_Parallel &$DB_Parallel null) {

        return $this->M($namespace)->select_one($this->shard_key($namespace$key), $key$fields$DB_Parallel); # shard-key, $id (using "id" as a shard key)

    
}

    
/**
     * @param $namespace
     * @param $key
     * @param DB_Parallel|null $DB_Parallel
     * @return array
     */
    
PUBLIC function load($namespace$keyDB_Parallel &$DB_Parallel null) {  # parsed data
        
if (! $key) {
            
Profiler::alert("KRDB::DT::load""Error - Empty KEY Load, namespace=$namespace");
            return [];
        }
        if (
$DB_Parallel) {
            if (
$DB_Parallel->isComplete()) {
                
Profiler::in("KRDB::DT::load/read", [$namespace$key]);
            } else {
                
Profiler::in("KRDB::DT::load/request", [$namespace$key]);
            }
        } else {
            
Profiler::in("KRDB::DT::load", [$namespace$key]);
        }

        
$D $this->_load($namespace$key"*"$DB_Parallel);
        if (!
$DB_Parallel || $DB_Parallel->isComplete()) {

            if (!
$D) {
                
Profiler::info("NO DATA");
                
Profiler::out();
                return 
is_array($key) ? $key : [];
            }

            
$this->loaded($namespace$keytrue); // notify about loaded key

            
if (isset($D["data"])) {
                
$d = @$D["data"];
                unset(
$D["data"]);
                if (
$d) {
                    
$SERIALIZER i('serializer', ["method" => "igbinary""compress" => 'zstd']);
                    
$D $D $SERIALIZER->out($d);//igbinary_unserialize(gzinflate($d));
                
}
            }

            unset(
$D["id"]); // we already have PK in "key"
            
if (isset($D["updated"])) {
                
$D["_updated"] = $D["updated"];
                unset(
$D["updated"]);
            }
            if (!
$D) {
                
Profiler::info("EMPTY DATA");
            }
        }

        
Profiler::out();
        return 
$D;
    }


    
// actual save
    
PUBLIC function save(/*KRDB*/ $K) {
        
$namespace $K->namespace->namespace;
        
$key $K->key;
        
$D $K->_D();
        
Profiler::in("KRDB::DT::save", [$namespace$key]);
        if (
is_array($key))
            
$mysql_fields $key + ["updated" => time()];
        else
            
$mysql_fields = ["id" => $key"updated" => time()];
        if (
$field_map CC("krdb.$namespace.mysql-fields")) {
            foreach (
qw($field_map) as $f) {
                if (isset(
$D[$f])) {
                    
$mysql_fields[$f] = $D[$f];
                    unset(
$D[$f]);
                }
            }
        }
        
$SERIALIZER i('serializer', ["method" => "igbinary""compress" => 'zstd']);
        
$mysql_fields["data"] = $SERIALIZER->in($D);//gzdeflate(igbinary_serialize($D));

        
$sk $this->shard_key($namespace$key);
        if (
$this->loaded($namespace$key)) // key exists !!
            
$this->M($namespace)->update($sk$key$mysql_fields);
        else {
            if (
is_array($key))
                
$this->M($namespace)->upsert($sk$mysql_fields$key); // insert/update
            
else
                
$this->M($namespace)->upsert($sk$mysql_fields); // insert/update
        
}
        
Profiler::out();
    }

    
// completely delete KEY
    // never call directly - use KRDB::i("ns", $id)->delete();
    
function delete($namespace$key) {
        
Profiler::in_off("KRDB::DT::delete", [$namespace$key]);
        
$this->loaded($namespace$keyfalse);
        
$this->M($namespace)->delete($this->shard_key($namespace$key), $key);
        
Profiler::out();
    }

    
// keep track of loaded records
    
private $is_loaded = []; // namespace => CappedSet
    // load = null  = is_loaded check
    // load = true  = notify about load
    // load = false = unnotify
    
function loaded($namespace$key$load=null) {
        if (
is_array($key))
            
$key json_encode($key);
        if (! (
$this->is_loaded[$namespace]??0))
            
$this->is_loaded[$namespace] = \hb\misc\CappedHash::i(); // 10K cap
        
$L = & $this->is_loaded[$namespace];
        if (
$load === null)
            return 
$L[$key];
        if (
$load)
            return 
$L[$key]=1;
        unset(
$L[$key]);
    }

}