<?php
namespace hb\db;
/*
*/
/*
Distributed Table = Sharded+Replicated Mysql Table
Data is spread over several mysql tables (shards)
We recommend to use 3+ servers for production.
Data is duplicated at least once.
All reads goes to primary location.
If primary shard is down backup location is used
All writes goes to primary and backup locations
Example:
$dt = \hb\db\DistributedTable::i("dt.profile");
$dt->select(1,['uk' => 2]); // shard-key = 1, uk = 2
Config:
distributed-table:
server-down: [server, server, ...]
schema.table:
# no-backup: 1 ## IF you do not want backup
max-shards: XX
primary:
server: [shard, shard-shard, ..]
backup:
server: [shard, shard-shard, ..]
primary-down:
[shard, shard-shard]
backup-down:
[shard, shard-shard]
uk: "field1 field2" // optional - default "id". unique key components
Must have fields:
updated_at (filled on update, insert)
Check out dt command line utility:
./dt --_setup='`id` int(11) NOT NULL, `data` blob NOT NULL, `key` int(11) NOT NULL, PRIMARY KEY(`id`)' -t dt.test
TODO:
* ez shard migration (server -> server)
* sync_queue parsing
* Archive node support
FUTURE:
* sync queue in redis
* never wait for backup - delayed backup sync-queue in redis
* do not wait for mysql -
write-through cache for primary and backup
*
*/
interface iDistributedTable
{
static function i($schema_table); # self instance !!
// CRUD
// key - shard_key, id - primary key
function select_one($key, $id, $fields="*", \DB_Parallel &$DB_Parallel = null); # {field:value, ...}
// key - shard_key, where
function select($key, array $where, $fields="*", \DB_Parallel &$DB_Parallel = null); # [{field:value, ...}, ...]
// INSERT .. (key, data)
// use inserter for mass inserts
function insert($key, array $data);
// insert or update, "id" required inside data
function upsert($key, array $data);
function update($key, $id, array $data);
function delete($key, $id);
// used for BatchInserts
// you can use only one options for same fieldset
function inserter($fields, array $options=[]); # DT_Batch_Inserter object
function updater(array $options=[]); # DT_Batch_Updater object
function admin(); # DistributedTableAdmin
}
class DistributedTableException extends \Exception {}
class DistributedTable implements iDistributedTable
{
public $st; // schema.table
public $schema;
public $table;
public $no_backup; // if 1 = no backups !!
public $max_shards;
public $primary_down; // shard => true
public $backup_down; // shard => true
public $uk; // (optional) config value -
// Unique Key - space delimited field list
public $s2p; // shard => primary alias
public $s2b; // shard => backup alias
public $exclude_sync; // C("exclude-sync") - exlude from `dt --sync` command
public $server_down; // (array) list of down servers
const CONFIG_CACHE_TTL = 60; // parsed config cache time to live
// current shard - read only
public $dbe_shard;
// current shard fail status - read only
public $dbe_fail;
// Cached configuration;
private $config;
private $config_must_be_validated = false;
// ------------------ ------------------ ------------------ ------------------
// PUBLIC/EXTERN functions
// INSTANTIATE VIA
// table: schema.table
// actual data is stored in "schema.table_$shard"
// config kept in C("table")
static function i($table) { # DistributedTable
static $cache;
if (empty($cache[$table]))
$cache[$table] = new static($table);
return $cache[$table];
}
function __toString() {
return get_class($this)."($this->st) Shards: $this->max_shards";
}
/**
* Select one record from DT.
* @param $key (shard_key), id - primary key (or query hash)
* @param $uk (unique key) - (int)ID or (hash)WHERE
* @param string $fields
* @return array
*/
function select_one($key, $uk, $fields="*", \DB_Parallel &$DB_Parallel = null) { # {field:value, ...}
\Profiler::in("DT::one");
$wh = is_array($uk) ? $uk : ["id" => $uk];
$wh["_limit"] = 1;
$d = $this->select($key, $wh, $fields, $DB_Parallel);
\Profiler::out([$this->st, $key, "uk" => $uk, "exists" => (bool) $d]);
return $d ? reset($d) : [];
}
/**
* @param $key - shard_key
* @param array|string $where k=>value or "mysql_expression"
* @param string $fields
* @return array [{field:value, ...}, ...]
*/
function select($key, array $where, $fields="*", \DB_Parallel &$DB_Parallel = null) {
$r = [];
[$DBE, $table] = $this->_dbe($key);
$this->iStatStart();
if ($DB_Parallel) {
if ($DB_Parallel->isComplete()) {
\Profiler::in("DT::select read", ["DB_Parallel" => "complete"]);
} else {
\Profiler::info("DT::select request", ["DB_Parallel" => "incomplete", "key" => $key]);
}
} else {
\Profiler::in("DT::select");
}
try {
$r = $DBE->select("$fields from $table", $where, "all_hash", $DB_Parallel);
$this->iStat("select");
} catch (\Exception $ex) {
$this->_fail_warn($ex);
$r = $this->_dbe_b()->select("$fields from " . $this->_bt(), $where, "all_hash", $DB_Parallel);
$this->iStat("select", ['fail' => 1]);
}
if (!$DB_Parallel || $DB_Parallel->isComplete()) {
\Profiler::out([$this->st, $key, $where, "count" => is_array($r)?count($r):($r?1:0)]);
}
return $r;
}
/**
* INSERT .. (key, data)
* @param int|array $key
* @param array $data
*/
function insert($key, array $data) {
#if (! isset($data["id"]))
# $this->alert("id required");
\Profiler::in("DT::insert", [$this->st, $key]);
$this->dbe($key);
try {
$this->iStatStart();
$this->_dbe_p()->insert($this->_t(), $data, [], "ignore");
$this->iStat("insert");
} catch (\Exception $ex) {
$this->_fail_warn($ex);
$f = new DBE_Failover($this);
$f->insert($this->_t(), $data, [], "ignore");
}
try {
$this->iStatStart();
$this->_dbe_b()->insert($this->_bt(), $data, [], "ignore");
$this->iStat("insert", ['backup' => 1]);
} catch (\Exception $ex) {
$this->_fail_warn($ex);
$f = new DBE_Failover($this, true);
$f->insert($this->_bt(), $data, [], "ignore");
}
\Profiler::out();
}
/**
* @param $key - unique key
* @param $uk - (int)ID or (hash)WHERE
* @param array $data
*/
function update($key, $uk, array $data) {
$wh = is_array($uk) ? $uk : ["id" => $uk];
\Profiler::in("DT::update", [$this->st, $key, $wh]);
$this->dbe($key);
try {
$this->iStatStart();
$this->_dbe_p()->update($this->_t(), $wh, $data);
$this->iStat("update");
} catch (\Exception $ex) {
$this->_fail_warn($ex);
$f = new DBE_Failover($this);
$f->update($this->_t(), $wh, $data);
}
try {
$this->iStatStart();
$this->_dbe_b()->update($this->_bt(), $wh, $data);
$this->iStat("update", ['backup' => 1]);
} catch (\Exception $ex) {
$this->_fail_warn($ex);
$f = new DBE_Failover($this);
$f->update($this->_bt(), $wh, $data);
}
\Profiler::out();
}
/**
* insert or update
* @param $key - unique key
* @param array $data
* @param null $uk (int)ID or (hash)WHERE
*/
function upsert($key, array $data, $uk=null) {
if ($uk===null)
$uk = ($data["id"]??0);
if (! $uk)
$this->alert("UK(unique key) required");
if ($this->select_one($key, $uk, "1")) {
# unset($data["id"]);
$this->update($key, $uk, $data);
return;
}
return $this->insert($key, $data);
}
/**
* @param $key - unique key
* @param $uk - (int)ID or (hash)WHERE
*/
function delete($key, $uk) {
$wh = is_array($uk) ? $uk : ["id" => $uk];
\Profiler::in("DT::delete", [$this->st, "wh" => $wh]);
$this->dbe($key);
try {
$this->iStatStart();
$this->_dbe_p()->delete($this->_t(), $wh);
$this->iStat("delete");
} catch (\Exception $ex) {
$this->_fail_warn($ex);
$f = new DBE_Failover($this);
$f->delete($this->_t(), $wh);
}
try {
$this->iStatStart();
$this->_dbe_b()->delete($this->_bt(), $wh);
$this->iStat("delete", ['backup' => 1]);
} catch (\Exception $ex) {
$this->_fail_warn($ex);
$f = new DBE_Failover($this);
$f->delete($this->_t(), $wh);
}
\Profiler::out();
}
private $inserter;
// make sure you have a lot of memory
// inserter does simle escaping only - check source code
// used for BatchInserts
function Inserter($fields, array $options=[]) { # DT_Batch_Inserter
if (! isset($this->inserter[$fields])) {
if ($this->no_backup)
$options["no-backup"] = 1;
$this->inserter[$fields] = new DT_Batch_Inserter($this, $fields, $options);
}
return $this->inserter[$fields];
}
// used for BatchUpdates
function Updater(array $options=[]) { # DT_Batch_Updater
return new DT_Batch_Updater($this, $options);
}
function Admin() { # DistributedTable_Admin
return new DistributedTable_Admin($this);
}
function Archive(array $config=[]) { # DistributedTable_Archive
return new DistributedTable_Archive($this, $config);
}
// execute sql statement over all shards
// %s is replaced with schema.table
// selects are executed only one (primary or backup)
// Example:
// $this->dt->p_exec("select count(*) cnt from %s
// ./dt -t dt.teaser --sql='show index from %s'
public /* ADMIN */ function exec($sql, $echo = false) { # {shard => result}
if (strtolower(substr(trim($sql),0, 6)) == 'select')
return $this->p_exec($sql, $echo);
if ($echo)
echo "PRIMARY SHARDS:";
$r = $this->p_exec($sql, $echo); # we still want to see some - ex: show
if ($echo)
echo "BACKUP SHARDS:";
$this->b_exec($sql, $echo);
return $r;
}
// select from multiple records (from different shards)
// use with caution, better avoid it
//
// key_id: array of [Key, ID]
// you can use any group field instead of ID, in this case -
// specify $query_field and pass array of [Key, Query_Field]
// important: $query_field will be converted to numeric - never use it on non-numeric fields
//
// implementation:
// system will issue one select per shard: select $fields from shard where $query_field in (....)
//
public function selectMul(array $key_id, $fields="*", $query_field="id") { # [results]
$s2ids = $this->_KeyId_to_Shard2Ids($key_id); # shard => IDS
$r = []; // ID 2 RESULT
foreach ($s2ids as $shard => $ids) {
$this->dbe_shard($shard);
$_ids = join(",", $ids);
$r = array_merge($r, $this->_dbe_p()->select("$fields from ".$this->_t(), ["$query_field in ($_ids)"]));
}
return $r;
}
// update multiple shards
// key_id: array of [Key, group_field]
// you can use any group field instead of ID, in this case -
// specify $query_field and pass array of [Key, Query_Field]
//
// implementation:
// system will issue one update per shard: update shard set ... where $query_field in (....)
public function updateMul(array $key_id, $data, $group_field="id") { #
$s2ids = $this->_KeyId_to_Shard2Ids($key_id); // shard => [ids, ...]
foreach ($s2ids as $shard => $ids) {
$this->dbe_shard($shard);
$_ids = join(",", $ids);
$wh = ["$group_field in ($_ids)"];
if (!$this->dbe_fail) {
$this->_dbe_p()->update($this->_t(), $wh, $data);
$this->_dbe_b()->update($this->_bt(), $wh, $data);
} else {
throw new DistributedTableException("Multiple Update impossible: SHARD-DOWN"); // dp not change message!
}
}
}
// delete on multiple shards
// see updateMul
public function deleteMul(array $key_id, $group_field="id") { #
$s2ids = $this->_KeyId_to_Shard2Ids($key_id); // shard => [ids, ...]
foreach ($s2ids as $shard => $ids) {
$this->dbe_shard($shard);
$_ids = join(",", $ids);
$wh = ["$group_field in ($_ids)"];
$this->_dbe_p()->delete($this->_t(), $wh);
$this->_dbe_b()->delete($this->_bt(), $wh);
}
}
// ------------------ ------------------ ------------------ ------------------
// -- Admin / Maintenance
// maintenance / test function - do not use
function selectBackup($key, array $where, $fields="*") { # [{field:value, ...}, ...]
$this->dbe($key);
return $this->_dbe_b()->select("$fields from ".$this->_bt(), $where);
} // maintenance / test function - do not use
function select_pri($key, array $where, $fields="*") { # [{field:value, ...}, ...]
$this->dbe($key);
return $this->_dbe_p()->select("$fields from ".$this->_t(), $where);
}
// run select on specific shard
// i("dt-pfl")->_select(1, [], "count(*)")
function _select($shard, array $where, $fields="*") { # [{field:value, ...}, ...]
return $this->dbe_shard($shard)->_dbe_p()->select("$fields from ".$this->_t(), $where);
}
// make sure uk is array
// make sure both shards online
// usage: $uk = __syncPrepare($uk)
private function _syncPrepare($shard, $uk) { # $uk
if (! is_array($uk))
$uk = ['id' => $uk];
$this->dbe_shard($shard);
if ($this->dbe_fail & 1)
throw new DistributedTableException("SHARD-DOWN"); // dp not change message!
if ($this->dbe_fail & 2)
throw new DistributedTableException("SHARD-DOWN");
return $uk;
}
// sync row from primary to backup
function syncPriBackup($shard, /*array*/ $uk) { #
$uk = $this->_syncPrepare($shard, $uk);
$pk = $this->_dbe_p()->pk($this->_t());
if (!is_assoc($uk))
$uk = array_combine($pk, $uk);
// get sourse
$data = $this->_dbe_p()->select("* from ".$this->_t(), $uk);
$data = reset($data);
if (! $data)
throw new DistributedTableException("No data from uk=".json_encode($uk));
// update or insert only if older
$check = $this->_dbe_b()->select("updated from ".$this->_bt(), $uk);
$check = reset($check);
if (!$check) {
$this->_dbe_b()->insert($this->_bt(), $data);
} elseif ($check['updated'] < $data['updated']) {
$this->_dbe_b()->update($this->_bt(), $uk, $data);
} else {
$msg = "Update timestamp is not newer uk=".json_encode($uk)." source=".$data['updated']." dest=".$check['updated'];
throw new DistributedTableException($msg);
}
}
// sync row from backup to primary
function syncBackupPri($shard, /*array*/ $uk) { #
$uk = $this->_syncPrepare($shard, $uk);
$pk = $this->_dbe_b()->pk($this->_t());
if (!is_assoc($uk))
$uk = array_combine($pk, $uk);
$data = $this->_dbe_b()->select("* from ".$this->_bt(), $uk);
$data = reset($data);
if (! $data)
throw new DistributedTableException("No data from uk=".json_encode($uk));
// $this->_dbe_p()->insert($this->_t(), $data, $data);
$check = $this->_dbe_p()->select("updated from ".$this->_t(), $uk);
$check = reset($check);
if (!$check) {
$this->_dbe_p()->insert($this->_t(), $data);
} elseif ($check['updated'] < $data['updated']) {
$this->_dbe_p()->update($this->_t(), $uk, $data);
} else {
$msg = "Update timestamp is not newer uk=".json_encode($uk)." source=".$data['updated']." dest=".$check['updated'];
throw new DistributedTableException($msg);
}
}
// delete record @ backup, that deleted from PRI
// does nothing if thereis a record in pri
function syncPriBackupDelete($shard, /*array*/ $uk) { #
$uk = $this->_syncPrepare($shard, $uk);
if (!is_assoc($uk)) {
$pk = $this->_dbe_p()->pk($this->_t());
$uk = array_combine($pk, $uk);
}
$data = $this->_dbe_p()->select("* from ".$this->_t(), $uk);
if ($data)
return;
$this->_dbe_b()->delete($this->_bt(), $uk);
}
// delete record @ pri, that deleted from BAK
// does nothing if there is a record in BAK
function syncBackupPriDelete($shard, /*array*/ $uk) { #
$uk = $this->_syncPrepare($shard, $uk);
if (!is_assoc($uk)) {
$pk = $this->_dbe_b()->pk($this->_t());
$uk = array_combine($pk, $uk);
}
$data = $this->_dbe_b()->select("* from ".$this->_bt(), $uk);
if ($data)
return;
$this->_dbe_p()->delete($this->_t(), $uk);
}
// ------------------ ------------------ ------------------ ------------------
// MED Level
function s2p($shard) { return ($this->s2p[$shard]??null); }
function s2b($shard) { return ($this->s2b[$shard]??null); }
//st = "schema.table_prefix"
// actual table names are: "schema.table_prefix"."_".$shard
// both primary and backup databases have same names
// NEVER instantiate directly
// use I("instance") or self::i("table")
// Ex: i('dt-teaser') || i('dt', 'street_loc')
/*protected*/ function __construct($st) {
if (is_array($st)) { // support for I("instance")
if (!empty($st['table']))
$st = $st["table"];
elseif ($st['_'])
$st = "dt.".$st['_'];
}
$this->st = $st;
[$this->schema, $this->table] = explode('.', $st);
$this->init();
}
// Figure out shard from a KEY
function shard($key) { # (int) shard : 1..max_shards
return crc32($key) % $this->max_shards + 1;
}
// init class from cache
function init($reset=0) { #
$c = $this->getConfig($reset);
$this->s2p = $c["s2p"];
$this->s2b = ($c["s2b"]??"");
$this->no_backup = $c["no-backup"] ?? null;
$this->uk = $c["uk"] ?? null; // unique key components - space delimited field list
$this->max_shards = $c["max-shards"];
$this->exclude_sync = $c["exclude-sync"] ?? null;
$this->server_down = (array) ($c["server-down"] ?? []);
$this->primary_down = self::parseRangesH($c["primary-down"]??null, 1); // [shard => 1]
$this->backup_down = self::parseRangesH($c["backup-down"]??null, 1); // [shard => 1]
foreach ($this->server_down as $server) {
foreach ($this->s2p as $shard => $ser) {
if ($ser == $server)
$this->primary_down[$shard] = 1;
}
foreach ($this->s2b as $shard => $ser) {
if ($ser == $server)
$this->backup_down[$shard] = 1;
}
}
if ($this->config_must_be_validated)
$this->admin()->check_shard_config(); // checking shard config evert CONFIG_CACHE_TTL
}
public function getConfig($reset = 0) {
if (!$this->config || $reset) {
$KEY = "dt:".$this->st;
if ($reset) {
apcu_delete($KEY);
}
$this->config = apcu_fetch($KEY);
if (! $this->config) {
$this->config = static::_init($this->st);
apcu_store($KEY, $this->config, static::CONFIG_CACHE_TTL);
$this->config_must_be_validated = true;
} else {
$this->config_must_be_validated = false;
}
}
return $this->config;
}
// parse number ranges from array of strings "1","3","5-10","50-60" => [1,3,5,6,7,8.....]
static function parseRanges(/*array*/ $s) { # array ranges
if (! $s)
return [];
$r = [];
foreach ($s as $v) {
if (strpos($v, "-")) {
[$start, $end]=explode("-", $v);
foreach (range($start, $end) as $i)
$r[] = $i;
continue;
}
$r[] = $v;
}
return $r;
}
// parse ranges from array of strings "1",5-10","50-60" to hash
// => [1 => $V, 5 => $V, 5=> $V....]
static function parseRangesH(/*array*/ $s, $SET) { # [item => $SET]
if (! $s)
return [];
$r = [];
foreach ($s as $v) {
if (strpos($v, "-")) {
[$start, $end]=explode("-", $v);
foreach (range($start, $end) as $i)
$r[$i] = $SET;
continue;
}
$r[$v] = $SET;
}
return $r;
}
// non cached init
static function _init($st) { # config hash
$C = C("distributed-table.$st");
if (! $C)
\Log::alert("no config for distributed table $st");
$C["s2p"] = self::__init_shard_to_alias($C["primary"]);
if (! isset($C["no-backup"]))
$C["s2b"] = self::__init_shard_to_alias($C["backup"]);
if ($v = CC("distributed-table.server-down"))
$C["server-down"] = $v;
unset($C["primary"]);
unset($C["backup"]);
return $C;
}
// alias => [shard-shard, shard, ..] to shard => alias
static function __init_shard_to_alias($alias2shards) { # { shard => alias }
$c = [];
foreach ((array)$alias2shards as $alias => $shards) {
$c += self::parseRangesH($shards, $alias);
}
return $c;
}
// Convert list of ($key_id = [ShardKey, Id]) to "shard" => [Id]
// used to convert several same-shard selects into one
// where can be anything u want - ID or your data
// hint: use dbe_shard to refer to correct shard
/* private */ function _KeyId_to_Shard2Ids(array $key_id) { # shard => [id,...]
$k2s = []; // cache(key => shard)
$s2ids = []; // key => where
foreach ($key_id as $ki) {
[$key, $id] = $ki;
$key = (int) $key;
$s = ($k2s[$key]??"");
if (! $s)
$s = $k2s[$key] = $this->shard($key);
$s2ids[$s][] = $id;
}
return $s2ids;
}
// ------------------ ------------------ ------------------ ------------------
// DBE LEVEL
//
//
//
/**
* used by selects only
* check alive status - return corresponding dbe
* @param $key - if $key is null then no "switch to shard" logic will be used.
* Could be used this way: i("dt", "xxxxxxx")->dbe_shard(1)->_dbe();
* @return array [$dbe, $table, $fail]
*/
function _dbe($key = null) { # [dbe, table, primary-fail]
if (null !== $key) {
$this->dbe($key, 0); // do not check backup
}
$fail = $this->dbe_fail == 1; // primary-fail
$dbe = $fail ? $this->_dbe_b() : $this->_dbe_p();
$table = $fail ? $this->_bt() : $this->_t();
return [$dbe, $table, $fail];
}
// check shard statuses, init vars, return $this
function dbe($key, $check_backup=1) { # $this
return $this->dbe_shard($this->shard($key), $check_backup);
}
// $s = shard
// check_backup - suppress backup check if primary is ok - can do it ONLY! with selects
function dbe_shard($s, $check_backup=1) { # $this
$fail = (int) isset($this->primary_down[$s]) | (((int) isset($this->backup_down[$s])) << 1);
if (\DB_Engine::isDead($this->s2p[$s]))
$fail |= 1;
if (($fail || $check_backup) && \DB_Engine::isDead($this->s2b[$s]))
$fail |= 2;
switch ($fail) {
case 3:
throw new DistributedTableException($this->st." shard $s is down pri:".$this->s2p[$s]." backup:".$this->s2b[$s]);
break;
case 2:
\Profiler::warn("Dt/Failover (backup) ", $this->s2b[$s]." down.");
break;
case 1:
\Profiler::warn("Dt/Failover (primary)", $this->s2p[$s]." down. using ".$this->s2b[$s]);
break;
}
$this->dbe_shard = $s;
$this->dbe_fail = $fail; // bit_1 = primary fail, bit_2 - secondary fail
return $this;
}
// SHOULD ONLY BE CALLED after previous dbe call
// primary dbe or failover replacement
function _dbe_p() { # DB_Engine or DBE_Failover($this)
if ($this->dbe_fail==1)
return new DBE_Failover($this);
$a = $this->s2p[$this->dbe_shard];
if (! $a)
\Log::alert("no DB for shard #".$this->dbe_shard);
return DBE($a);
}
// SHOULD ONLY BE CALLED after previous dbe call
// backup dbe or failover replacement
function _dbe_b() { # DB_Engine or DBE_Failover($this, true)
if ($this->no_backup)
return new DBE_NoBackup();
if ($this->dbe_fail==2)
return new DBE_Failover($this, true);
return DBE($this->s2b[$this->dbe_shard]);
}
// see p_exec
// execute sql on current primary shard
// %s is replaced by a shard table
function _p_exec($sql) {
return $this->_dbe_p()->run(sprintf($sql, $this->_t()));
}
// see b_exec
// execute sql on current backup shard
function _b_exec($sql) {
return $this->_dbe_b()->run(sprintf($sql, $this->_bt()));
}
// ------------------ ------------------ ------------------ ------------------
// LOW Level
// warn about fail
function _fail_warn($ex) {
\Profiler::warn("Dt/Failover", $ex->getMessage());
if ($cnt = once()) // we can recover, do we do not want to pollute logs
\Log::warning("DT:".get_class($ex)." $cnt messages suppressed.\n Last message: ".$ex->getMessage()." trace: ".x2s($ex->getTrace()) );
}
// primary shard schema.table name
function _t($shard=false) { # schema.table_name
if ($shard===false)
$shard = $this->dbe_shard;
return sprintf("%s_%d", $this->st, $shard);
}
// backup shard schema.table name
function _bt($shard=false) { # schema.table_name
if ($shard===false)
$shard = $this->dbe_shard;
return sprintf("%s_b.%s_%d", $this->schema, $this->table, $shard);
}
// execute sql on ALL primary shards
// if callable then it called as $callable($data, $shard) after every shard
public function p_exec($sql, $echo = false, $callable=null) { # { shard => $shard_data }
$r = [];
$this->dbe_fail = 0;
$tm = microtime(true);
foreach (range(1, $this->max_shards) as $shard) {
if ($echo) {
$tm = self::_echo_timing($tm);
echo "shard $shard";
}
$this->dbe_shard = $shard;
try {
$z = $this->_p_exec($sql);
$d = $z->fetchAll(\PDO::FETCH_ASSOC);
if ($callable)
$d = $callable($d, $shard);
$r[$shard] = $d;
} catch(\Exception $ex) {
$r[$shard] = "Error:".$ex->getMessage();
echo "\n Error: ".$ex->getMessage()."\n";
}
}
if ($echo)
self::_echo_timing($tm);
return $r;
}
static function _echo_timing($old) {
$time = microtime(true);
$tm = $time - $old;
if ($tm>0.9)
echo ": ".number_format($tm, 2)." sec\n";
else
echo "\n";
return $time;
}
// execute sql on ALL backup shards
public function b_exec($sql, $echo = false) { # [$shard => result]
$r = [];
$this->dbe_fail = 0;
$tm = microtime(true);
foreach (range(1, $this->max_shards) as $shard) {
if ($echo) {
$tm = self::_echo_timing($tm);
echo "shard $shard"."_b";
}
$this->dbe_shard = $shard;
try {
$z = $this->_b_exec($sql);
$r[$shard] = $z->fetchAll(\PDO::FETCH_ASSOC);
} catch(\Exception $ex) {
$r[$shard] = "Error:".$ex->getMessage();
echo "\n Error: ".$ex->getMessage()."\n";
}
}
if ($echo)
self::_echo_timing($tm);
return $r;
}
// LOW LEVEL SYSTEM
// alert message and DIE
function alert($message) { #
echo "DistributedTable:$this->st $message\n";
\Log::alert("DistributedTable:$this->st $message");
}
private $istat_time = 0;
function iStatStart() { #
$this->istat_time = microtime(1);
}
// op - select, update, delete, insert
function iStat($op, array $hit=[], $data=[]) { #
static $dt_nodes = [
'dt.pfl' => 'pfl',
'dt.pfl2' => 'pfl2',
'dt.profile2' => 'profile2',
'dt.ext' => 'ext'
];
$node = ($n = ($dt_nodes[$this->st]??null)) ? $n : "other";
$keys = $this->iStatKeys();
$time = (microtime(1) - $this->istat_time) * 1000;
$hit['time'] = $time;
$hit['hit_'.$op] = 1;
$hit['time_'.$op] = $time;
$server = $this->s2p[$this->dbe_shard];
if (($hit['fail']??0) || ($hit['backup']??0)) {
$server = $this->s2b[$this->dbe_shard];
if ($hit['fail']??0) {
$hit['hit_fail'] = $hit['fail'];
unset($hit['fail']);
$hit['time_fail'] = $time;
}
if ($hit['backup']??0) {
$hit['hit_backup'] = $hit['backup'];
unset($hit['backup']);
$hit['time_backup'] = $time;
}
} else {
// hits and time per server are only for selects from primary nodes
$hit["hit-".$server] = 1;
$hit["time-".$server] =$time;
}
if ($op != 'select') {
$hit['hit_modify'] = 1;
$hit["time_modify"] =$time;
}
if (PHP_SAPI == 'cli') {
$hit['hit_cli'] = 1;
$hit['time_cli'] = $time;
} else {
$hit['hit_web'] = 1;
$hit['time_web'] = $time;
}
#\d_log($_SERVER['REQUEST_URI']."\n ".$this->st.": ".x2s($hit));
// accesses to non-main dt by name
if ($node == 'other') {
$n = str_replace("dt.", "", $this->st);
$hit['dt-'.$n] = 1;
}
}
// cache iStat APC-hit keys in APC
function iStatKeys() { # keys
$K = "dt:".$this->st.".istat_keys"; // keys used
$keys = apcu_fetch($K);
if ($keys)
return $keys;
$pb = array_flip($this->s2p); // ONLY PRIMARY !! + array_flip($this->s2b); // primary + backup servers
$keys = [];
foreach (qw("select update delete insert modify web cli backup fail") as $param) {
$keys[] = "hit-$param"; // hits
$keys[] = "time-$param"; // time
}
foreach (array_keys($pb) as $server) {
$keys[] = "hit-$server"; // hits
$keys[] = "time-$server"; // time-on-server
}
$keys = join(" ", $keys);
apcu_store($K, $keys, 600);
return $keys;
}
}
// dbe failover for insert/update/delete
// one node is failed, another one should add task to "to-sync" queue
class DBE_Failover {
public $dt;
public $backup;
// backup = false - "Primary Shard is Down"
// backup = true - "Backup Shard is Down"
function __construct($dt, $backup=false) {
$this->dt = $dt;
$this->backup = $backup;
}
function insert($table, $data) {
$this->_save("insert", $data);
}
function update($table, $q, $data) {
$this->_save("update", $q);
}
function delete($table, $q) {
$this->_save("delete", $q);
}
private function _save($op, $q) {
$d = $this->backup ? $this->dt->_dbe_p() : $this->dt->_dbe_b();
$schema = $this->backup ? $this->dt->schema : $this->dt->schema."_b";
if ($this->dt->uk) {
$uk = [];
foreach (explode(" ", $this->dt->uk) as $field)
$uk[] = $q[$field];
$uk = json_encode($uk);
} else {
$uk = $q["id"] ?? "";
}
$d->insert($schema.".queue",
["op" => $op,
"st" => $this->dt->st,
"shard" => $this->dt->dbe_shard,
"uk" => NVL($uk, ""),
"backup" => $this->backup ? 1: 0]);
}
function select($what, $where) {
\Log::notice("error: dt-failover-select ".$this->dt->st." ".x2s(Caller()));
}
}
// For development or brave people
class DBE_NoBackup {
function __construct() { }
function insert($table, $data) { }
function update($table, $q, $data) { }
function delete($table, $q) { }
function run($sql) { }
function select($what, $where) {
\Log::alert("No Backup & Primary Failed");
}
}
/*
internal class,
never instantiate directly. use DistributedTable::i(..)->inserter
make sure you have enough MEMORY (1M * $max_shards)
Batch Inserter only works when there are no DOWN nodes
*/
class DT_Batch_Inserter {
protected $DT; // DistributedTable
protected $inserter=[]; // {"alias:shard" => DB_BatchInsert}
protected $fields=""; // see batchInsertSetup()
protected $dirty=0;
#const MAXLEN = 1048576; // buffer per shard, make sure your mysql can read it
public $options = []; // BatchInsert Options
public $MAXLEN = 262144; // 256K buffer - use options[maxlen] to change
public $backup = true; // make a copy on backup shard
// flush all unsaved changes
public function flush() {
foreach ($this->inserter as $as => $I)
$I->flush();
$this->dirty=0;
}
// data - LIST of items to insert
// insert + full escape
PUBLIC function insert($key, array $data) {
foreach ($data as $k => &$d)
$d = \DB::quote($d);
$this->insert_no_escape($key, $data);
}
// you HAVE to escape!!
// use on you own risk
/* ADMIN */ function insert_no_escape($key, array $data) {
$shard = $this->DT->shard($key);
$p = $this->DT->s2p[$shard];
$this->dirty++;
$this->_inserter($p, $shard)->add_no_escape($data);
if ($this->backup) {
$b = $this->DT->s2b[$shard];
$this->_inserter($b, $shard, true)->add_no_escape($data); //backup
}
}
// options - options for BatchInsert plus some extras
// extra options:
// maxlen - change default buffer size (256k)
// no-backup - do not do backup - ADMIN USE ONLY
function __construct($DT, $fields, array $options=[]) {
$this->DT = $DT;
$this->fields = $fields;
if ($ml = hash_unset($options, "maxlen"))
$this->MAXLEN = $ml;
if (hash_unset($options, "no-backup"))
$this->backup = false;
$this->options = $options;
}
// alias, shard
protected function _inserter($a, $s, $b=false) { # Db_BatchInsert
$K = "$a:$s:$b"; // :$b is excessive,but we are going for most common case
if ($i = ($this->inserter[$K]??null))
return $i;
$table = $b ? $this->DT->_bt($s) : $this->DT->_t($s);
$this->inserter[$K] =
new \DB_BatchInsert("$a:$table",
qw($this->fields),
$this->options,
$this->MAXLEN);
return $this->inserter[$K];
}
// it is a really bad idea to rely on destructors for saving data
// we'll flush it and add notice about bad code
function __destruct() {
if (! $this->dirty)
return;
$this->flush();
echo "DT_Batch_Inserter Destructor. Missing flush.\n";
}
}
/*
Issue SAME Update command on lots of shards
update XXX set $command where UK in (...)
Example:
> DB_BatchUpdate::$debug=1
> ; $PFL_BATCH_UPDATER =i("dt-pfl")->updater(["set" => "has=has|512",
"batch_size" => "10000",
"fields" => "lname_id,fname_id,region", # Shard Key is First Field, Important
"where" => "has & 512 = 0"])
# ->update($uk); // UK - unique key !!! imporant Shard Key must be $uk[0] !!!
> $PFL_BATCH_UPDATER->update([8265,2108915,0])
> $PFL_BATCH_UPDATER->flush()
dt-b
UPDATE dt.pfl_299
SET has=has|512
WHERE has & 512 = 0 AND (lname_id,fname_id,region) in ((8265,2108915,0))
updater flush 1
> i("dt-pfl")->select(8265, ["lname_id" => 8265, "fname_id" => 2108915, "region" => 0], "lname_id, fname_id, region, has")
Array(
0 =>
fname_id => 2108915
lname_id => 8265
region => 0
has => 545
)
*/
class DT_Batch_Updater {
protected $updater = []; // shard => Batch_Update
protected $dirty=0;
public $DT; // DistributedTable
public $options = []; // BatchUpdate Options
/*
options:
fields
set
batch_size - DEFAULT is 1000 (10 times less than usual BatchUpdate)
no-backup
*/
function __construct($DT, array $options) {
$this->DT = $DT;
$this->options = $options + ["batch_size" => 1000]; //
}
// IF UK is ARRAY = shard key MUST be first!!
function update($uk) {
$this->dirty++;
$key = is_array($uk) ? $uk[0] : $uk;
$shard = $this->DT->shard($key);
//shards down [120-130,451-475]
if ($this->DT->primary_down[$shard]) {
// sad but we have to ignore down shards
// have to specifically reimport them l8
return;
}
$alias = $this->DT->s2p[$shard];
$this->_updater($alias, $shard)->update($uk); // pri
#if (empty($this->options["no-backup"])) {
# $b = $this->DT->s2b[$shard];
# $this->_updater($b, $shard, true)->update($uk); // backup
#}
}
// flush all unsaved changes
public function flush() {
foreach ($this->updater as $as => $I)
if ($I)
$I->flush();
$this->dirty=0;
}
// alias, shard
protected function _updater($a, $s, $b=false) { # Db_BatchUpdater
$K = "$a:$s:$b"; // :$b is excessive,but we are going for most common case
if ($i = ($this->updater[$K]??null))
return $i;
$table = $b ? $this->DT->_bt($s) : $this->DT->_t($s);
$this->updater[$K] = new \DB_BatchUpdate("$a:$table", $this->options);
return $this->updater[$K];
}
// it is a really bad idea to rely on destructors for saving data
// we'll flush it and add notice about bad code
function __destruct() {
if (! $this->dirty)
return;
$this->flush();
echo "DT_Batch_Updater. Missing flush.\n";
}
}