» /rd/homebase/hb/db/DistributedTable.php

<?php

namespace hb\db;

/*


*/


/*
  Distributed Table = Sharded+Replicated Mysql Table

  Data is spread over several mysql tables (shards)
  We recommend to use 3+ servers for production.
  Data is duplicated at least once.

  All reads goes to primary location.
  If primary shard is down backup location is used

  All writes goes to primary and backup locations

Example:

  $dt = \hb\db\DistributedTable::i("dt.profile");
  $dt->select(1,['uk' => 2]);  //  shard-key = 1, uk = 2

Config:

distributed-table:
    server-down: [server, server, ...]
    schema.table:
        # no-backup: 1    ## IF you do not want backup
        max-shards: XX
        primary:
            server: [shard, shard-shard, ..]
        backup:
            server: [shard, shard-shard, ..]
        primary-down:
            [shard, shard-shard]
        backup-down:
            [shard, shard-shard]
        uk: "field1 field2"   // optional - default "id". unique key components

Must have fields:
    updated_at (filled on update, insert)

Check out dt command line utility:
    ./dt --_setup='`id` int(11) NOT NULL, `data` blob NOT NULL, `key` int(11) NOT NULL, PRIMARY KEY(`id`)' -t dt.test


TODO:

    * ez shard migration (server -> server)

    * sync_queue parsing

    * Archive node support

FUTURE:
    * sync queue in redis
    * never wait for backup - delayed backup sync-queue in redis
    * do not wait for mysql -
        write-through cache for primary and backup
    *

*/


interface iDistributedTable
{

    static function 
i($schema_table);  # self instance !!

    // CRUD

    // key - shard_key, id - primary key
    
function select_one($key$id$fields="*"\DB_Parallel &$DB_Parallel null);  # {field:value, ...}

    // key - shard_key, where
    
function select($key, array $where$fields="*"\DB_Parallel &$DB_Parallel null);  # [{field:value, ...}, ...]

    // INSERT .. (key, data)
    // use inserter for mass inserts
    
function insert($key, array $data);

    
// insert or update, "id" required inside data
    
function upsert($key, array $data);

    function 
update($key$id, array $data);

    function 
delete($key$id);

    
// used for BatchInserts
    // you can use only one options for same fieldset
    
function inserter($fields, array $options=[]); # DT_Batch_Inserter object
    
function updater(array $options=[]); # DT_Batch_Updater object

    
function admin(); # DistributedTableAdmin

}

class 
DistributedTableException extends \Exception {}

class 
DistributedTable implements iDistributedTable
{
    public 
$st;  // schema.table
    
public $schema;
    public 
$table;

    public 
$no_backup// if 1 = no backups !!

    
public $max_shards;

    public 
$primary_down;  // shard => true
    
public $backup_down;   // shard => true
    
public $uk;            // (optional) config value -
                           // Unique Key - space delimited field list

    
public $s2p;  // shard => primary alias
    
public $s2b;  // shard => backup alias

    
public $exclude_sync// C("exclude-sync") - exlude from `dt --sync` command
    
public $server_down;  // (array) list of down servers

    
const CONFIG_CACHE_TTL 60;  // parsed config cache time to live

    // current shard - read only
    
public $dbe_shard;
    
// current shard fail status - read only
    
public $dbe_fail;

    
// Cached configuration;
    
private $config;
    private 
$config_must_be_validated false;

    
// ------------------ ------------------ ------------------ ------------------
    // PUBLIC/EXTERN functions

    // INSTANTIATE VIA
    // table: schema.table
    // actual data is stored in "schema.table_$shard"
    // config kept in C("table")
    
static function i($table) { # DistributedTable
        
static $cache;
        if (empty(
$cache[$table]))
            
$cache[$table] = new static($table);
        return 
$cache[$table];
    }

    function 
__toString() {
        return 
get_class($this)."($this->st) Shards: $this->max_shards";
    }

    
/**
     * Select one record from DT.
     * @param $key (shard_key), id - primary key (or query hash)
     * @param $uk (unique key) - (int)ID or (hash)WHERE
     * @param string $fields
     * @return array
     */
    
function select_one($key$uk$fields="*"\DB_Parallel &$DB_Parallel null) {  # {field:value, ...}
        
\Profiler::in("DT::one");
        
$wh is_array($uk) ? $uk : ["id" => $uk];
        
$wh["_limit"] = 1;
        
$d $this->select($key$wh$fields$DB_Parallel);
        
\Profiler::out([$this->st$key"uk" => $uk"exists" => (bool) $d]);
        return 
$d reset($d) : [];
    }


    
/**
     * @param $key - shard_key
     * @param array|string $where k=>value or "mysql_expression"
     * @param string $fields
     * @return array [{field:value, ...}, ...]
     */
    
function select($key, array $where$fields="*"\DB_Parallel &$DB_Parallel null) {
        
$r = [];
        [
$DBE$table] = $this->_dbe($key);
        
$this->iStatStart();

        if (
$DB_Parallel) {
            if (
$DB_Parallel->isComplete()) {
                
\Profiler::in("DT::select read", ["DB_Parallel" => "complete"]);
            } else {
                
\Profiler::info("DT::select request", ["DB_Parallel" => "incomplete""key" => $key]);
            }
        } else {
            
\Profiler::in("DT::select");
        }

        try {
            
$r $DBE->select("$fields from $table"$where"all_hash"$DB_Parallel);
            
$this->iStat("select");
        } catch (
\Exception $ex) {
            
$this->_fail_warn($ex);
            
$r $this->_dbe_b()->select("$fields from " $this->_bt(), $where"all_hash"$DB_Parallel);
            
$this->iStat("select", ['fail' => 1]);
        }

        if (!
$DB_Parallel || $DB_Parallel->isComplete()) {
            
\Profiler::out([$this->st$key$where"count" => is_array($r)?count($r):($r?1:0)]);
        }

        return 
$r;
    }

    
/**
     * INSERT .. (key, data)
     * @param int|array $key
     * @param array $data
     */
    
function insert($key, array $data) {
        
#if (! isset($data["id"]))
        #    $this->alert("id required");
        
\Profiler::in("DT::insert", [$this->st$key]);
        
$this->dbe($key);
        try {
            
$this->iStatStart();
            
$this->_dbe_p()->insert($this->_t(), $data, [], "ignore");
            
$this->iStat("insert");
        } catch (
\Exception $ex) {
            
$this->_fail_warn($ex);
            
$f = new DBE_Failover($this);
            
$f->insert($this->_t(), $data, [], "ignore");
        }
        try {
            
$this->iStatStart();
            
$this->_dbe_b()->insert($this->_bt(), $data, [], "ignore");
            
$this->iStat("insert", ['backup' => 1]);
        } catch (
\Exception $ex) {
            
$this->_fail_warn($ex);
            
$f = new DBE_Failover($thistrue);
            
$f->insert($this->_bt(), $data, [], "ignore");
        }
        
\Profiler::out();
    }

    
/**
     * @param $key - unique key
     * @param $uk - (int)ID or (hash)WHERE
     * @param array $data
     */
    
function update($key$uk, array $data) {
        
$wh is_array($uk) ? $uk : ["id" => $uk];
        
\Profiler::in("DT::update", [$this->st$key$wh]);
        
$this->dbe($key);
        try {
            
$this->iStatStart();
            
$this->_dbe_p()->update($this->_t(), $wh$data);
            
$this->iStat("update");
        } catch (
\Exception $ex) {
            
$this->_fail_warn($ex);
            
$f = new DBE_Failover($this);
            
$f->update($this->_t(), $wh$data);
        }
        try {
            
$this->iStatStart();
            
$this->_dbe_b()->update($this->_bt(), $wh$data);
            
$this->iStat("update", ['backup' => 1]);
        } catch (
\Exception $ex) {
            
$this->_fail_warn($ex);
            
$f = new DBE_Failover($this);
            
$f->update($this->_bt(), $wh$data);
        }
        
\Profiler::out();
    }

    
/**
     * insert or update
     * @param $key - unique key
     * @param array $data
     * @param null $uk (int)ID or (hash)WHERE
     */
    
function upsert($key, array $data$uk=null) {
        if (
$uk===null)
            
$uk = ($data["id"]??0);
        if (! 
$uk)
            
$this->alert("UK(unique key) required");
        if (
$this->select_one($key$uk"1")) {
            
# unset($data["id"]);
            
$this->update($key$uk$data);
            return;
        }
        return 
$this->insert($key$data);
    }

    
/**
     * @param $key - unique key
     * @param $uk - (int)ID or (hash)WHERE
     */
    
function delete($key$uk) {
        
$wh is_array($uk) ? $uk : ["id" => $uk];
        
\Profiler::in("DT::delete", [$this->st"wh" => $wh]);
        
$this->dbe($key);
        try {
            
$this->iStatStart();
            
$this->_dbe_p()->delete($this->_t(), $wh);
            
$this->iStat("delete");
        } catch (
\Exception $ex) {
            
$this->_fail_warn($ex);
            
$f = new DBE_Failover($this);
            
$f->delete($this->_t(), $wh);
        }
        try {
            
$this->iStatStart();
            
$this->_dbe_b()->delete($this->_bt(), $wh);
            
$this->iStat("delete", ['backup' => 1]);
        } catch (
\Exception $ex) {
            
$this->_fail_warn($ex);
            
$f = new DBE_Failover($this);
            
$f->delete($this->_t(), $wh);
        }
        
\Profiler::out();
    }


    private 
$inserter;
    
// make sure you have a lot of memory
    // inserter does simle escaping only - check source code

    // used for BatchInserts
    
function Inserter($fields, array $options=[]) { # DT_Batch_Inserter
        
if (! isset($this->inserter[$fields])) {
            if (
$this->no_backup)
                
$options["no-backup"] = 1;
            
$this->inserter[$fields] = new DT_Batch_Inserter($this$fields$options);
        }
        return 
$this->inserter[$fields];
    }

    
// used for BatchUpdates
    
function Updater(array $options=[]) { # DT_Batch_Updater
        
return new DT_Batch_Updater($this$options);
    }

    function 
Admin() { # DistributedTable_Admin
        
return new DistributedTable_Admin($this);
    }

    function 
Archive(array $config=[]) { # DistributedTable_Archive
        
return new DistributedTable_Archive($this$config);
    }

    
// execute sql statement over all shards
    // %s is replaced with schema.table
    // selects are executed only one (primary or backup)
    // Example:
    //       $this->dt->p_exec("select count(*) cnt from %s
    //       ./dt -t dt.teaser --sql='show index from %s'
    
public /* ADMIN */ function exec($sql$echo false) { # {shard => result}
        
if (strtolower(substr(trim($sql),06)) == 'select')
            return 
$this->p_exec($sql$echo);
        if (
$echo)
            echo 
"PRIMARY SHARDS:";
        
$r $this->p_exec($sql$echo);  # we still want to see some - ex: show
        
if ($echo)
            echo 
"BACKUP SHARDS:";
        
$this->b_exec($sql$echo);
        return 
$r;
    }

    
// select from multiple records (from different shards)
    // use with caution, better avoid it
    //
    // key_id:  array of [Key, ID]
    // you can use any group field instead of ID, in this case -
    //    specify $query_field and pass array of [Key, Query_Field]
    //    important: $query_field will be converted to numeric - never use it on non-numeric fields
    //
    // implementation:
    //      system will issue one select per shard: select $fields from shard where $query_field in (....)
    //
    
public function selectMul(array $key_id$fields="*"$query_field="id") {  # [results]
        
$s2ids $this->_KeyId_to_Shard2Ids($key_id); # shard => IDS
        
$r = []; // ID 2 RESULT
        
foreach ($s2ids as $shard => $ids) {
            
$this->dbe_shard($shard);
            
$_ids join(","$ids);
            
$r array_merge($r$this->_dbe_p()->select("$fields from ".$this->_t(), ["$query_field in ($_ids)"]));
        }
        return 
$r;
    }

    
// update multiple shards
    // key_id:  array of [Key, group_field]
    // you can use any group field instead of ID, in this case -
    //    specify $query_field and pass array of [Key, Query_Field]
    //
    // implementation:
    //      system will issue one update per shard: update shard set ... where $query_field in (....)
    
public function updateMul(array $key_id$data$group_field="id") { #
        
$s2ids $this->_KeyId_to_Shard2Ids($key_id); // shard => [ids, ...]
        
foreach ($s2ids as $shard => $ids) {
            
$this->dbe_shard($shard);
            
$_ids join(","$ids);
            
$wh = ["$group_field in ($_ids)"];

            if (!
$this->dbe_fail) {
                
$this->_dbe_p()->update($this->_t(), $wh$data);
                
$this->_dbe_b()->update($this->_bt(), $wh$data);
            } else {
                throw new 
DistributedTableException("Multiple Update impossible: SHARD-DOWN"); // dp not change message!
            
}
        }
    }

    
// delete on multiple shards
    // see updateMul
    
public function deleteMul(array $key_id$group_field="id") { #
        
$s2ids $this->_KeyId_to_Shard2Ids($key_id); // shard => [ids, ...]
        
foreach ($s2ids as $shard => $ids) {
            
$this->dbe_shard($shard);
            
$_ids join(","$ids);
            
$wh = ["$group_field in ($_ids)"];
            
$this->_dbe_p()->delete($this->_t(), $wh);
            
$this->_dbe_b()->delete($this->_bt(), $wh);
       }
    }

    
// ------------------ ------------------ ------------------ ------------------
    // -- Admin / Maintenance

    // maintenance / test function - do not use
    
function selectBackup($key, array $where$fields="*") {  # [{field:value, ...}, ...]
        
$this->dbe($key);
        return  
$this->_dbe_b()->select("$fields from ".$this->_bt(), $where);
    }    
// maintenance / test function - do not use
    
function select_pri($key, array $where$fields="*") {  # [{field:value, ...}, ...]
        
$this->dbe($key);
        return  
$this->_dbe_p()->select("$fields from ".$this->_t(), $where);
    }

    
// run select on specific shard
    //  i("dt-pfl")->_select(1, [], "count(*)")
    
function _select($shard, array $where$fields="*") {  # [{field:value, ...}, ...]
        
return $this->dbe_shard($shard)->_dbe_p()->select("$fields from ".$this->_t(), $where);
    }

    
// make sure uk is array
    // make sure both shards online
    // usage: $uk = __syncPrepare($uk)
    
private function _syncPrepare($shard$uk) { # $uk
        
if (! is_array($uk))
            
$uk = ['id' => $uk];
        
$this->dbe_shard($shard);
        if (
$this->dbe_fail 1)
            throw new 
DistributedTableException("SHARD-DOWN"); // dp not change message!
        
if ($this->dbe_fail 2)
            throw new 
DistributedTableException("SHARD-DOWN");
        return 
$uk;
    }

    
// sync row from primary to backup
    
function syncPriBackup($shard/*array*/ $uk) { #
        
$uk $this->_syncPrepare($shard$uk);
        
$pk $this->_dbe_p()->pk($this->_t());
        if (!
is_assoc($uk))
            
$uk array_combine($pk$uk);

        
// get sourse
        
$data $this->_dbe_p()->select("* from ".$this->_t(), $uk);
        
$data reset($data);
        if (! 
$data)
            throw new 
DistributedTableException("No data from uk=".json_encode($uk));

        
// update or insert only if older
        
$check $this->_dbe_b()->select("updated from ".$this->_bt(), $uk);
        
$check reset($check);
        if (!
$check) {
            
$this->_dbe_b()->insert($this->_bt(), $data);
        } elseif (
$check['updated'] < $data['updated']) {
            
$this->_dbe_b()->update($this->_bt(), $uk$data);
        } else {
            
$msg "Update timestamp is not newer uk=".json_encode($uk)." source=".$data['updated']." dest=".$check['updated'];
            throw new 
DistributedTableException($msg);
        }
    }

    
// sync row from backup to primary
    
function syncBackupPri($shard/*array*/ $uk) { #
        
$uk $this->_syncPrepare($shard$uk);
        
$pk $this->_dbe_b()->pk($this->_t());
        if (!
is_assoc($uk))
            
$uk array_combine($pk$uk);

        
$data $this->_dbe_b()->select("* from ".$this->_bt(), $uk);
        
$data reset($data);
        if (! 
$data)
            throw new 
DistributedTableException("No data from uk=".json_encode($uk));

//         $this->_dbe_p()->insert($this->_t(), $data, $data);
        
$check $this->_dbe_p()->select("updated from ".$this->_t(), $uk);
        
$check reset($check);
        if (!
$check) {
            
$this->_dbe_p()->insert($this->_t(), $data);
        } elseif (
$check['updated'] < $data['updated']) {
            
$this->_dbe_p()->update($this->_t(), $uk$data);
        } else {
               
$msg "Update timestamp is not newer uk=".json_encode($uk)." source=".$data['updated']." dest=".$check['updated'];
            throw new 
DistributedTableException($msg);
        }
    }

    
// delete record @ backup, that deleted from PRI
    // does nothing if thereis a record in pri
    
function syncPriBackupDelete($shard/*array*/ $uk) { #
        
$uk $this->_syncPrepare($shard$uk);
        if (!
is_assoc($uk)) {
            
$pk $this->_dbe_p()->pk($this->_t());
            
$uk array_combine($pk$uk);
        }

        
$data $this->_dbe_p()->select("* from ".$this->_t(), $uk);
        if (
$data)
            return;
        
$this->_dbe_b()->delete($this->_bt(), $uk);
    }

    
// delete record @ pri, that deleted from BAK
    // does nothing if there is a record in BAK
    
function syncBackupPriDelete($shard/*array*/ $uk) { #
        
$uk $this->_syncPrepare($shard$uk);
        if (!
is_assoc($uk)) {
            
$pk $this->_dbe_b()->pk($this->_t());
            
$uk array_combine($pk$uk);
        }

        
$data $this->_dbe_b()->select("* from ".$this->_bt(), $uk);
        if (
$data)
            return;
        
$this->_dbe_p()->delete($this->_t(), $uk);
    }

    
// ------------------ ------------------ ------------------ ------------------
    // MED Level


    
function s2p($shard) { return ($this->s2p[$shard]??null); }
    function 
s2b($shard) { return ($this->s2b[$shard]??null); }

    
//st = "schema.table_prefix"
    // actual table names are: "schema.table_prefix"."_".$shard
    // both primary and backup databases have same names

    // NEVER instantiate directly
    // use I("instance") or self::i("table")
    // Ex: i('dt-teaser') || i('dt', 'street_loc')
    /*protected*/ 
function __construct($st) {
        if (
is_array($st)) {  // support for I("instance")
            
if (!empty($st['table']))
                
$st $st["table"];
            elseif (
$st['_'])
                
$st "dt.".$st['_'];
        }
        
$this->st $st;
        [
$this->schema$this->table] = explode('.'$st);
        
$this->init();
    }


    
// Figure out shard from a KEY
    
function shard($key) { # (int) shard : 1..max_shards
        
return crc32($key) % $this->max_shards 1;
    }

    
// init class from cache
    
function init($reset=0) { #

        
$c $this->getConfig($reset);

        
$this->s2p $c["s2p"];
        
$this->s2b = ($c["s2b"]??"");

        
$this->no_backup    $c["no-backup"] ?? null;
        
$this->uk           $c["uk"] ?? null// unique key components - space delimited field list
        
$this->max_shards   $c["max-shards"];
        
$this->exclude_sync $c["exclude-sync"] ?? null;
        
$this->server_down  = (array) ($c["server-down"] ?? []);
        
$this->primary_down self::parseRangesH($c["primary-down"]??null1); // [shard => 1]
        
$this->backup_down self::parseRangesH($c["backup-down"]??null1);   // [shard => 1]
        
foreach ($this->server_down as $server) {
            foreach (
$this->s2p as $shard => $ser) {
                if (
$ser == $server)
                    
$this->primary_down[$shard] = 1;
            }
            foreach (
$this->s2b as $shard => $ser) {
                if (
$ser == $server)
                    
$this->backup_down[$shard] = 1;
            }
        }

        if (
$this->config_must_be_validated)
            
$this->admin()->check_shard_config(); // checking shard config evert CONFIG_CACHE_TTL
    
}

    public function 
getConfig($reset 0) {
        if (!
$this->config || $reset) {
            
$KEY "dt:".$this->st;
            if (
$reset) {
                
apcu_delete($KEY);
            }

            
$this->config apcu_fetch($KEY);

            if (! 
$this->config) {
                
$this->config = static::_init($this->st);
                
apcu_store($KEY$this->config, static::CONFIG_CACHE_TTL);
                
$this->config_must_be_validated true;
            } else {
                
$this->config_must_be_validated false;
            }
        }

        return 
$this->config;
    }

    
// parse number ranges from array of strings "1","3","5-10","50-60" => [1,3,5,6,7,8.....]
    
static function parseRanges(/*array*/ $s) { # array ranges
        
if (! $s)
            return [];
        
$r = [];
        foreach (
$s as $v) {
            if (
strpos($v"-")) {
                [
$start$end]=explode("-"$v);
                foreach (
range($start$end) as $i)
                    
$r[] = $i;
                continue;
            }
            
$r[] = $v;
        }
        return 
$r;
    }

    
// parse ranges from array of strings "1",5-10","50-60" to hash
    //  => [1 => $V, 5 => $V, 5=> $V....]
    
static function parseRangesH(/*array*/ $s$SET) { # [item => $SET]
        
if (! $s)
            return [];
        
$r = [];
        foreach (
$s as $v) {
            if (
strpos($v"-")) {
                [
$start$end]=explode("-"$v);
                foreach (
range($start$end) as $i)
                    
$r[$i] = $SET;
                continue;
            }
            
$r[$v] = $SET;
        }
        return 
$r;
    }

    
// non cached init
    
static function _init($st) { # config hash
        
$C C("distributed-table.$st");
        if (! 
$C)
            
\Log::alert("no config for distributed table $st");
        
$C["s2p"] = self::__init_shard_to_alias($C["primary"]);
        if (! isset(
$C["no-backup"]))
            
$C["s2b"] = self::__init_shard_to_alias($C["backup"]);
        if (
$v CC("distributed-table.server-down"))
            
$C["server-down"] = $v;
        unset(
$C["primary"]);
        unset(
$C["backup"]);
        return 
$C;
    }

    
// alias => [shard-shard, shard, ..] to shard => alias
    
static function __init_shard_to_alias($alias2shards) { # { shard => alias }
        
$c = [];
        foreach ((array)
$alias2shards as $alias => $shards) {
            
$c += self::parseRangesH($shards$alias);
        }
        return 
$c;
    }

    
// Convert list of ($key_id = [ShardKey, Id]) to "shard" => [Id]
    // used to convert several same-shard selects into one
    // where can be anything u want - ID or your data
    // hint: use dbe_shard to refer to correct shard
    /* private */ 
function _KeyId_to_Shard2Ids(array $key_id) { # shard => [id,...]
        
$k2s = []; // cache(key => shard)
        
$s2ids = []; // key => where
        
foreach ($key_id as $ki) {
            [
$key$id] = $ki;
            
$key = (int) $key;
            
$s = ($k2s[$key]??"");
            if (! 
$s)
                
$s $k2s[$key] = $this->shard($key);
            
$s2ids[$s][] = $id;
        }
        return 
$s2ids;
    }

    
// ------------------ ------------------ ------------------ ------------------
    // DBE LEVEL

    //
    //
    //
    /**
     * used by selects only
     * check alive status - return corresponding dbe
     * @param $key - if $key is null then no "switch to shard" logic will be used.
     *               Could be used this way: i("dt", "xxxxxxx")->dbe_shard(1)->_dbe();
     * @return array [$dbe, $table, $fail]
     */
    
function _dbe($key null) { # [dbe, table, primary-fail]
        
if (null !== $key) {
            
$this->dbe($key0); // do not check backup
        
}
        
$fail  $this->dbe_fail == 1// primary-fail
        
$dbe   $fail $this->_dbe_b() : $this->_dbe_p();
        
$table $fail $this->_bt() : $this->_t();
        return [
$dbe$table$fail];
    }

    
// check shard statuses, init vars, return $this
    
function dbe($key$check_backup=1) {  # $this
        
return $this->dbe_shard($this->shard($key), $check_backup);
    }

    
// $s = shard
    // check_backup - suppress backup check if primary is ok - can do it ONLY! with selects
    
function dbe_shard($s$check_backup=1) {  # $this
        
$fail = (int) isset($this->primary_down[$s]) | (((int) isset($this->backup_down[$s])) << 1);
        if (
\DB_Engine::isDead($this->s2p[$s]))
            
$fail |= 1;
        if ((
$fail || $check_backup) && \DB_Engine::isDead($this->s2b[$s]))
            
$fail |= 2;
        switch (
$fail) {
            case 
3:
                throw new 
DistributedTableException($this->st." shard $s is down pri:".$this->s2p[$s]." backup:".$this->s2b[$s]);
                break;
            case 
2:
                
\Profiler::warn("Dt/Failover (backup) "$this->s2b[$s]." down.");
                break;
            case 
1:
                
\Profiler::warn("Dt/Failover (primary)"$this->s2p[$s]." down. using ".$this->s2b[$s]);
                break;
        }
        
        
$this->dbe_shard $s;
        
$this->dbe_fail $fail// bit_1 = primary fail, bit_2 - secondary fail
        
return $this;
    }


    
// SHOULD ONLY BE CALLED after previous dbe call
    // primary dbe or failover replacement
    
function _dbe_p() {  # DB_Engine or DBE_Failover($this)
        
if ($this->dbe_fail==1)
            return new 
DBE_Failover($this);
        
$a $this->s2p[$this->dbe_shard];
        if (! 
$a)
            
\Log::alert("no DB for shard #".$this->dbe_shard);
        return 
DBE($a);
    }

    
// SHOULD ONLY BE CALLED after previous dbe call
    // backup dbe or failover replacement
    
function _dbe_b() {  # DB_Engine or DBE_Failover($this, true)
        
if ($this->no_backup)
            return new 
DBE_NoBackup();
        if (
$this->dbe_fail==2)
            return new 
DBE_Failover($thistrue);
        return 
DBE($this->s2b[$this->dbe_shard]);
    }

    
// see p_exec
    // execute sql on current primary shard
    // %s is replaced by a shard table
    
function _p_exec($sql) {
        return 
$this->_dbe_p()->run(sprintf($sql$this->_t()));
    }

    
// see b_exec
    // execute sql on current backup shard
    
function _b_exec($sql) {
        return 
$this->_dbe_b()->run(sprintf($sql$this->_bt()));
    }

    
// ------------------ ------------------ ------------------ ------------------
    // LOW Level

    // warn about fail
    
function _fail_warn($ex) {
        
\Profiler::warn("Dt/Failover"$ex->getMessage());
        if (
$cnt once()) // we can recover, do we do not want to pollute logs
            
\Log::warning("DT:".get_class($ex).$cnt messages suppressed.\n Last message: ".$ex->getMessage()." trace: ".x2s($ex->getTrace()) );
    }

    
// primary shard schema.table name
    
function _t($shard=false) { # schema.table_name
        
if ($shard===false)
            
$shard $this->dbe_shard;
        return 
sprintf("%s_%d"$this->st$shard);
    }

    
// backup shard schema.table name
    
function _bt($shard=false) { # schema.table_name
        
if ($shard===false)
            
$shard $this->dbe_shard;
        return 
sprintf("%s_b.%s_%d"$this->schema$this->table$shard);
    }

    
// execute sql on ALL primary shards
    // if callable then it called as $callable($data, $shard) after every shard
    
public function p_exec($sql$echo false$callable=null) { # { shard => $shard_data }
        
$r = [];
        
$this->dbe_fail 0;
        
$tm microtime(true);
        foreach (
range(1$this->max_shards) as $shard) {
            if (
$echo) {
                
$tm self::_echo_timing($tm);
                echo 
"shard $shard";
            }
            
$this->dbe_shard $shard;
            try {
                
$z $this->_p_exec($sql);
                
$d $z->fetchAll(\PDO::FETCH_ASSOC);
                if (
$callable)
                    
$d $callable($d$shard);
                
$r[$shard] = $d;
            } catch(
\Exception $ex) {
               
$r[$shard] = "Error:".$ex->getMessage();
               echo 
"\n Error: ".$ex->getMessage()."\n";
            }
        }
        if (
$echo)
            
self::_echo_timing($tm);
        return 
$r;
    }

    static function 
_echo_timing($old) {
        
$time microtime(true);
        
$tm $time $old;
        if (
$tm>0.9)
            echo 
": ".number_format($tm2)." sec\n";
        else
            echo 
"\n";
        return 
$time;
    }

    
// execute sql on ALL backup shards
    
public function b_exec($sql$echo false) { # [$shard => result]
        
$r = [];
        
$this->dbe_fail 0;
        
$tm microtime(true);
        foreach (
range(1$this->max_shards) as $shard) {
            if (
$echo) {
                
$tm self::_echo_timing($tm);
                echo 
"shard $shard"."_b";
            }
            
$this->dbe_shard $shard;
            try {
                
$z $this->_b_exec($sql);
                
$r[$shard] = $z->fetchAll(\PDO::FETCH_ASSOC);
            } catch(
\Exception $ex) {
               
$r[$shard] = "Error:".$ex->getMessage();
               echo 
"\n Error: ".$ex->getMessage()."\n";
            }
        }
        if (
$echo)
            
self::_echo_timing($tm);
        return 
$r;
    }

    
// LOW LEVEL SYSTEM


    // alert message and DIE
    
function alert($message) { #
        
echo "DistributedTable:$this->st $message\n";
        
\Log::alert("DistributedTable:$this->st $message");
    }

    private 
$istat_time 0;
    function 
iStatStart() { #
        
$this->istat_time microtime(1);
    }

    
// op - select, update, delete, insert
    
function iStat($op, array $hit=[], $data=[]) { #
        
static $dt_nodes = [
            
'dt.pfl' => 'pfl',
            
'dt.pfl2' => 'pfl2',
            
'dt.profile2' => 'profile2',
            
'dt.ext' => 'ext'
        
];
        
$node = ($n = ($dt_nodes[$this->st]??null)) ? $n "other";
        
$keys $this->iStatKeys();
        
$time = (microtime(1) - $this->istat_time) * 1000;
        
$hit['time'] = $time;
        
$hit['hit_'.$op] = 1;
        
$hit['time_'.$op] = $time;
        
$server $this->s2p[$this->dbe_shard];
        if ((
$hit['fail']??0) || ($hit['backup']??0)) {
            
$server $this->s2b[$this->dbe_shard];
            if (
$hit['fail']??0) {
                
$hit['hit_fail'] = $hit['fail'];
                unset(
$hit['fail']);
                
$hit['time_fail'] = $time;
            }
            if (
$hit['backup']??0) {
                
$hit['hit_backup'] = $hit['backup'];
                unset(
$hit['backup']);
                
$hit['time_backup'] = $time;
            }
        } else {
            
// hits and time per server are only for selects from primary nodes
            
$hit["hit-".$server] = 1;
            
$hit["time-".$server] =$time;
        }
        if (
$op != 'select') {
            
$hit['hit_modify'] = 1;
            
$hit["time_modify"] =$time;
        }
        if (
PHP_SAPI == 'cli') {
            
$hit['hit_cli'] = 1;
            
$hit['time_cli'] = $time;
        } else {
            
$hit['hit_web'] = 1;
            
$hit['time_web'] = $time;
        }
        
#\d_log($_SERVER['REQUEST_URI']."\n  ".$this->st.": ".x2s($hit));
        // accesses to non-main dt by name
        
if ($node == 'other') {
            
$n str_replace("dt."""$this->st);
            
$hit['dt-'.$n] = 1;
        }

        i('Stat''dt-'.$node)->apcHit($hit$data$keys);

    }

    
// cache iStat APC-hit keys in APC
    
function iStatKeys() { # keys
        
$K "dt:".$this->st.".istat_keys"// keys used
        
$keys apcu_fetch($K);
        if (
$keys)
            return 
$keys;
        
$pb array_flip($this->s2p); // ONLY PRIMARY !! + array_flip($this->s2b); // primary + backup servers
        
$keys = [];
        foreach (
qw("select update delete insert modify web cli backup fail") as $param) {
            
$keys[] = "hit-$param"// hits
            
$keys[] = "time-$param"// time
        
}
        foreach (
array_keys($pb) as $server) {
            
$keys[] = "hit-$server"// hits
            
$keys[] = "time-$server"// time-on-server
        
}
        
$keys join(" "$keys);
        
apcu_store($K$keys600);
        return 
$keys;
    }

}


// dbe failover for insert/update/delete
// one node is failed, another one should add task to "to-sync" queue
class DBE_Failover {

    public 
$dt;
    public 
$backup;

    
// backup = false - "Primary Shard is Down"
    // backup = true  - "Backup Shard is Down"
    
function __construct($dt$backup=false) {
        
$this->dt $dt;
        
$this->backup $backup;
    }

    function 
insert($table$data) {
        
$this->_save("insert"$data);
    }

    function 
update($table$q$data) {
        
$this->_save("update"$q);
    }

    function 
delete($table$q) {
        
$this->_save("delete"$q);
    }

    private function 
_save($op$q) {
        
$d  $this->backup $this->dt->_dbe_p() : $this->dt->_dbe_b();
        
$schema $this->backup $this->dt->schema $this->dt->schema."_b";

        if (
$this->dt->uk) {
            
$uk = [];
            foreach (
explode(" "$this->dt->uk) as $field)
                
$uk[] = $q[$field];
            
$uk json_encode($uk);
        } else {
            
$uk $q["id"] ?? "";
        }
        
$d->insert($schema.".queue",
                    [
"op" => $op,
                    
"st"  => $this->dt->st,
                    
"shard" => $this->dt->dbe_shard,
                    
"uk" => NVL($uk""),
                    
"backup" => $this->backup 10]);
    }

    function 
select($what$where) {
        
\Log::notice("error: dt-failover-select ".$this->dt->st." ".x2s(Caller()));
    }

}

// For development or brave people
class DBE_NoBackup {

    function 
__construct() {  }

    function 
insert($table$data) {  }

    function 
update($table$q$data) {  }

    function 
delete($table$q) {  }

    function 
run($sql) {  }

    function 
select($what$where) {
        
\Log::alert("No Backup & Primary Failed");
    }

}

/*
   internal class,
   never instantiate directly. use DistributedTable::i(..)->inserter

   make sure you have enough MEMORY (1M * $max_shards)

   Batch Inserter only works when there are no DOWN nodes

*/
class DT_Batch_Inserter {

    protected 
$DT;  // DistributedTable

    
protected $inserter=[];  // {"alias:shard" => DB_BatchInsert}
    
protected $fields=""// see batchInsertSetup()
    
protected $dirty=0;

    
#const MAXLEN = 1048576;  // buffer per shard, make sure your mysql can read it
    
public $options = [];     // BatchInsert Options
    
public $MAXLEN 262144;  // 256K buffer - use options[maxlen] to change
    
public $backup true;    // make a copy on backup shard

    // flush all unsaved changes
    
public function flush() {
        foreach (
$this->inserter as $as => $I)
            
$I->flush();
        
$this->dirty=0;
    }

    
// data - LIST of items to insert
    // insert + full escape
    
PUBLIC function insert($key, array $data) {
        foreach (
$data as $k => &$d)
            
$d \DB::quote($d);
        
$this->insert_no_escape($key$data);
    }

    
// you HAVE to escape!!
    // use on you own risk
    /* ADMIN */ 
function insert_no_escape($key, array $data) {
        
$shard $this->DT->shard($key);
        
$p $this->DT->s2p[$shard];
        
$this->dirty++;
        
$this->_inserter($p$shard)->add_no_escape($data);
        if (
$this->backup) {
            
$b $this->DT->s2b[$shard];
            
$this->_inserter($b$shardtrue)->add_no_escape($data);  //backup
        
}
    }

    
// options - options for BatchInsert plus some extras
    // extra options:
    //      maxlen    - change default buffer size (256k)
    //      no-backup - do not do backup - ADMIN USE ONLY
    
function __construct($DT$fields, array $options=[]) {
        
$this->DT $DT;
        
$this->fields $fields;
        if (
$ml hash_unset($options"maxlen"))
            
$this->MAXLEN $ml;
        if (
hash_unset($options"no-backup"))
            
$this->backup false;
        
$this->options $options;
    }

    
// alias, shard
    
protected function _inserter($a$s$b=false) { # Db_BatchInsert
        
$K "$a:$s:$b";  // :$b is excessive,but we are going for most common case
        
if ($i = ($this->inserter[$K]??null))
            return 
$i;
        
$table $b $this->DT->_bt($s) : $this->DT->_t($s);
        
$this->inserter[$K] =
            new 
\DB_BatchInsert("$a:$table",
                               
qw($this->fields),
                               
$this->options,
                               
$this->MAXLEN);
        return 
$this->inserter[$K];
    }

    
// it is a really bad idea to rely on destructors for saving data
    // we'll flush it and add notice about bad code
    
function __destruct() {
        if (! 
$this->dirty)
            return;
        
$this->flush();
        echo 
"DT_Batch_Inserter Destructor. Missing flush.\n";
    }

}

/*

    Issue SAME Update command on lots of shards

    update XXX set $command where UK in (...)

    Example:

    >  DB_BatchUpdate::$debug=1
    > ; $PFL_BATCH_UPDATER =i("dt-pfl")->updater(["set" => "has=has|512",
        "batch_size" => "10000",
        "fields" => "lname_id,fname_id,region",    # Shard Key is First Field, Important
        "where" => "has & 512 = 0"])
    # ->update($uk);   // UK - unique key  !!! imporant Shard Key must be $uk[0] !!!
    >  $PFL_BATCH_UPDATER->update([8265,2108915,0])
    > $PFL_BATCH_UPDATER->flush()
    dt-b
        UPDATE dt.pfl_299
           SET has=has|512
         WHERE has & 512 = 0 AND (lname_id,fname_id,region) in ((8265,2108915,0))
    updater flush 1
    > i("dt-pfl")->select(8265, ["lname_id" => 8265, "fname_id" => 2108915, "region" => 0], "lname_id, fname_id, region, has")
    Array(
    0  =>
         fname_id => 2108915
         lname_id => 8265
         region => 0
         has => 545
    )

*/
class DT_Batch_Updater {

    protected 
$updater = []; // shard => Batch_Update
    
protected $dirty=0;
    public 
$DT;  // DistributedTable
    
public $options = [];     // BatchUpdate Options

    /*
        options:
            fields
            set
            batch_size     - DEFAULT is 1000 (10 times less than usual BatchUpdate)
            no-backup
    */
    
function __construct($DT, array $options) {
        
$this->DT $DT;
        
$this->options $options + ["batch_size" => 1000];  //
    
}

    
// IF UK is ARRAY = shard key MUST be first!!
    
function update($uk) {
        
$this->dirty++;
        
$key is_array($uk) ? $uk[0] : $uk;
        
$shard $this->DT->shard($key);
        
//shards down [120-130,451-475]
        
if ($this->DT->primary_down[$shard]) {
           
// sad but we have to ignore down shards
           // have to specifically reimport them l8
           
return;
        }

        
$alias $this->DT->s2p[$shard];
        
$this->_updater($alias$shard)->update($uk);  // pri
        #if (empty($this->options["no-backup"])) {
        #    $b = $this->DT->s2b[$shard];
        #    $this->_updater($b, $shard, true)->update($uk);  // backup
        #}
    
}

    
// flush all unsaved changes
    
public function flush() {
        foreach (
$this->updater as $as => $I)
            if (
$I)
                
$I->flush();
        
$this->dirty=0;
    }

   
// alias, shard
    
protected function _updater($a$s$b=false) { # Db_BatchUpdater
        
$K "$a:$s:$b";  // :$b is excessive,but we are going for most common case
        
if ($i = ($this->updater[$K]??null))
            return 
$i;
        
$table $b $this->DT->_bt($s) : $this->DT->_t($s);
        
$this->updater[$K] = new \DB_BatchUpdate("$a:$table"$this->options);
        return 
$this->updater[$K];
    }

    
// it is a really bad idea to rely on destructors for saving data
    // we'll flush it and add notice about bad code
    
function __destruct() {
        if (! 
$this->dirty)
            return;
        
$this->flush();
        echo 
"DT_Batch_Updater. Missing flush.\n";
    }

}