<?php
/*
Distributed mysql table storage
See homebase\db\Dt
key is scalar or hash
Config:
krdb.$namespace.mysql-fields - space delimited field list (or array)
- this fields will be saved as mysql fields
krdb.$namespace.shard-key - (optional) when defined $key is treated as hash, $hash[$shard_key] used as a dt shard key
- (default) $key - is scalar and shard-key is "ID"
// TODO - cache successfully loaded records, then use update instead of upsert (avoid check record lookup)
*/
class KRDB_Storage_DT implements iKRDB_Storage {
# namespace to DT mapping
protected function M($namespace) { # \hb\db\DistributedTable
return \hb\db\DistributedTable::i("dt.$namespace");
}
function shard_key($namespace, $key) { # shard-key value
if (! is_array($key))
return $key;
#static $shard_key;
#if (! $shard_key)
$shard_key = C("distributed-table.dt.$namespace.shard-key");
$sk = $key[$shard_key];
if (! $sk)
\Log::alert("no shard key in key:".json_encode($key));
return $sk;
}
/**
* For overloading
* This version of KRDB_Storage_DT supports DB_Parallel calls.
* @reimp
* @param $namespace
* @param $key
* @param string $fields
* @param DB_Parallel|null $DB_Parallel
* @return array
* @throws DB_Parallel_Exception
*/
function _load($namespace, $key, $fields="*", DB_Parallel &$DB_Parallel = null) {
}
/**
* @param $namespace
* @param $key
* @param DB_Parallel|null $DB_Parallel
* @return array
*/
PUBLIC function load($namespace, $key, DB_Parallel &$DB_Parallel = null) { # parsed data
if (! $key) {
Profiler::alert("KRDB::DT::load", "Error - Empty KEY Load, namespace=$namespace");
return [];
}
if ($DB_Parallel) {
if ($DB_Parallel->isComplete()) {
Profiler::in("KRDB::DT::load/read", [$namespace, $key]);
} else {
Profiler::in("KRDB::DT::load/request", [$namespace, $key]);
}
} else {
Profiler::in("KRDB::DT::load", [$namespace, $key]);
}
$D = $this->_load($namespace, $key, "*", $DB_Parallel);
if (!$DB_Parallel || $DB_Parallel->isComplete()) {
if (!$D) {
Profiler::info("NO DATA");
Profiler::out();
return is_array($key) ? $key : [];
}
$this->loaded($namespace, $key, true); // notify about loaded key
if (isset($D["data"])) {
$d = @$D["data"];
unset($D["data"]);
if ($d) {
$SERIALIZER = i('serializer', ["method" => "igbinary", "compress" => 'zstd']);
$D = $D + $SERIALIZER->out($d);//igbinary_unserialize(gzinflate($d));
}
}
unset($D["id"]); // we already have PK in "key"
if (isset($D["updated"])) {
$D["_updated"] = $D["updated"];
unset($D["updated"]);
}
if (!$D) {
Profiler::info("EMPTY DATA");
}
}
Profiler::out();
return $D;
}
// actual save
PUBLIC function save(/*KRDB*/ $K) {
$namespace = $K->namespace->namespace;
$key = $K->key;
$D = $K->_D();
Profiler::in("KRDB::DT::save", [$namespace, $key]);
if (is_array($key))
$mysql_fields = $key + ["updated" => time()];
else
$mysql_fields = ["id" => $key, "updated" => time()];
if ($field_map = CC("krdb.$namespace.mysql-fields")) {
foreach (qw($field_map) as $f) {
if (isset($D[$f])) {
$mysql_fields[$f] = $D[$f];
unset($D[$f]);
}
}
}
$SERIALIZER = i('serializer', ["method" => "igbinary", "compress" => 'zstd']);
$mysql_fields["data"] = $SERIALIZER->in($D);//gzdeflate(igbinary_serialize($D));
$sk = $this->shard_key($namespace, $key);
if ($this->loaded($namespace, $key)) // key exists !!
$this->M($namespace)->update($sk, $key, $mysql_fields);
else {
if (is_array($key))
$this->M($namespace)->upsert($sk, $mysql_fields, $key); // insert/update
else
$this->M($namespace)->upsert($sk, $mysql_fields); // insert/update
}
Profiler::out();
}
// completely delete KEY
// never call directly - use KRDB::i("ns", $id)->delete();
function delete($namespace, $key) {
Profiler::in_off("KRDB::DT::delete", [$namespace, $key]);
$this->loaded($namespace, $key, false);
$this->M($namespace)->delete($this->shard_key($namespace, $key), $key);
Profiler::out();
}
// keep track of loaded records
private $is_loaded = []; // namespace => CappedSet
// load = null = is_loaded check
// load = true = notify about load
// load = false = unnotify
function loaded($namespace, $key, $load=null) {
if (is_array($key))
$key = json_encode($key);
if (! ($this->is_loaded[$namespace]??0))
$this->is_loaded[$namespace] = \hb\misc\CappedHash::i(); // 10K cap
$L = & $this->is_loaded[$namespace];
if ($load === null)
return $L[$key];
if ($load)
return $L[$key]=1;
unset($L[$key]);
}
}