programming4us
programming4us
DATABASE

My SQL : Replication for High Availability - Procedures (part 7) - Slave Promotion - Slave promotion in Python

- Free product key for windows 10
- Free Product Key for Microsoft office 365
- Malwarebytes Premium 3.7.1 Serial Keys (LifeTime) 2019
4.3. Slave promotion in Python

You have now seen two techniques for promoting a slave: a traditional technique that suffers from a loss of transactions on some slaves, and a more complex technique that recovers all available transactions. The traditional technique is straightforward to implement in Python, so let’s concentrate on the more complicated one. To handle slave promotion this way, it is necessary to:

  • Configure all the slaves correctly

  • Add the tables Global_Trans_ID and Last_Exec_Trans to the master

  • Provide the application code to commit a transaction correctly

  • Write code to automate slave promotion

You can use the Promotable class (shown in Example 10) to handle the new kind of server. As you can see, this example reuses the previously introduced _enable_binlog helper method, and adds a method to set the log-slave-updates option. Since a promotable slave requires the special tables we showed earlier for the master, the addition of a promotable slave requires the tables added to the master. To do this, we write a function named _add_global_id_tables. The function assumes that if the tables already exist, they have the correct definition, so no attempt is made to re-create them. However, the Last_Exec_Trans table needs to start with one row for the update to work correctly, so if no warning was produced to indicate that a table already exists, we create the table and add a row with NULL.

Example 10. The definition of a promotable slave role
_GLOBAL_TRANS_ID_DEF = """
CREATE TABLE IF NOT EXISTS Global_Trans_ID (
number INT UNSIGNED NOT NULL AUTO_INCREMENT,
PRIMARY KEY (number)
) ENGINE=MyISAM
"""

_LAST_EXEC_TRANS_DEF = """
CREATE TABLE IF NOT EXISTS Last_Exec_Trans (
server_id INT UNSIGNED DEFAULT NULL,
trans_id INT UNSIGNED DEFAULT NULL
) ENGINE=InnoDB
"""

class Promotable(Role):
def __init__(self, repl_user, master):
self.__master = master
self.__user = repl_user

def _add_global_id_tables(self, master):
master.sql(_GLOBAL_TRANS_ID_DEF)
master.sql(_LAST_EXEC_TRANS_DEF)
if not master.sql("SELECT @@warning_count"):
master.sql("INSERT INTO Last_Exec_Trans() VALUES ()")

def _relay_events(self, server, config):
config.set('mysqld', 'log-slave-updates')

def imbue(self, server):
# Fetch and update the configuration
config = server.get_config()
self._set_server_id(server, config)
self._enable_binlog(server, config)
self._relay_event(server, config)

# Put the new configuration in place
server.stop()
server.put_config(config)
server.start()

# Add tables to master
self._add_global_id_tables(self.__master)

server.repl_user = self.__master.repl_user


This routine configures the slaves and the master correctly for using global transaction IDs. You still have to update the Last_Exec_Trans table when committing each transaction. In Example 11 you can see an example implementation in PHP for committing transactions. The code is written using PHP, since this is part of the application code and not part of the code for managing the deployment.

Example 11. Code for starting, committing, and aborting transactions
function start_trans($link) {
mysql_query("START TRANSACTION", $link);
}

function commit_trans($link) {
mysql_select_db("common", $link);
mysql_query("SET SQL_LOG_BIN = 0", $link);
mysql_query("INSERT INTO Global_Trans_ID() VALUES ()", $link);
$trans_id = mysql_insert_id($link);
$result = mysql_query("SELECT @@server_id as server_id", $link);
$row = mysql_fetch_row($result);
$server_id = $row[0];

$delete_query = "DELETE FROM Global_Trans_ID WHERE number = %d";
mysql_query(sprintf($delete_query, $trans_id),
$link);
mysql_query("SET SQL_LOG_BIN = 1", $link);

$update_query = "UPDATE Last_Exec_Trans SET server_id = %d, trans_id = %d";
mysql_query(sprintf($update_query, $server_id, $trans_id), $link);
mysql_query("COMMIT", $link);
}

function rollback_trans($link) {
mysql_query("ROLLBACK", $link);
}


We can then use this code to commit transactions by calling the functions instead of the usual COMMIT and ROLLBACK. For example, we could write a PHP function to add a message to a database and update a message counter for the user:

function add_message($email, $message, $link) {
start_trans($link);
mysql_select_db("common", $link);
$query = sprintf("SELECT user_id FROM user WHERE email = '%s'", $email);
$result = mysql_query($query, $link);
$row = mysql_fetch_row($result);
$user_id = $row[0];

$update_user = "UPDATE user SET messages = messages + 1 WHERE user_id = %d";
mysql_query(sprintf($update_user, $user_id), $link);

$insert_message = "INSERT INTO message VALUES (%d,'%s')";
mysql_query(sprintf($insert_message, $user_id, $message), $link);
commit_trans($link);
}

$conn = mysql_connect(":/var/run/mysqld/mysqld1.sock", "root");
add_message('[email protected]', "MySQL Python Replicant rules!", $conn);


What remains is the job of handling the actual slave promotion in the event of a failure. The procedure was already outlined above, but the implementation is more involved.

In this case, we need to fetch the entire binlog file, since we do not know where to start reading. The fetch_remote_binlog function in Example 12 returns an iterator to the lines of the binary log.

Example 12. Fetching a remote binary log
def fetch_remote_binlog(server, binlog_file):
command = ["mysqlbinlog",
"--read-from-remote-server",
"--force",
"--host=%s" % (server.host),
"--user=%s" % (server.sql_user.name)]
if server.sql_user.passwd:
command.append("--password=%s" % (server.sql_user.passwd))
command.append(binlog_file)
return iter(subprocess.Popen(command, stdout=subprocess.PIPE).stdout)

The iterator returns the lines of the binary log one by one, so the lines have to be further separated into transactions and events to make the binary log easier to work with. In Example 13 you can see a function named group_by_event that groups lines belonging to the same event into a single string, and a function named group_by_trans that groups a stream of events (as returned by group_by_event) into lists, where each list represents a transaction.

Example 13. Parsing the mysqlbinlog output to extract transactions
delimiter = "/*!*/;"

def group_by_event(lines):
event_lines = []
for line in lines:
if line.startswith('#'):
if line.startswith("# End of log file"):
del event_lines[-1]
yield ''.join(event_lines)
return
if line.startswith("# at"):
yield ''.join(event_lines)
event_lines = []
event_lines.append(line)

def group_by_trans(lines):
group = []
in_transaction = False
for event in group_by_event(lines):
group.append(event)
if event.find(delimiter + "\nBEGIN\n" + delimiter) >= 0:
in_transaction = True
elif not in_transaction:
yield group
group = []
else:
p = event.find("\nCOMMIT")
if p >= 0 and (event.startswith(delimiter, p+7)
or event.startswith(delimiter, p+8)):
yield group
group = []
in_transaction = False


Example 14 shows a function named scan_logfile that scans the mysqlbinlog output for the global transaction IDs that were introduced. The function accepts a master from which to fetch the binlog file, the name of a binlog file to scan (the filename is the name of the binary log on the master), and a callback function on_gid that will be called whenever a global transaction ID is seen. The on_gid function will be called with the global transaction ID (consisting of a server_id and a trans_id) and the binlog position of the end of the transaction.

Example 14. Scanning a binlog file for global transaction IDs
                     r"server_id = (?P<server_id>\d+),\s+"
r"trans_id = (?P<trans_id>\d+)$", re.MULTILINE)
_HEADCRE = re.compile(r"#\d{6}\s+\d?\d:\d\d:\d\d\s+"
r"server id\s+(?P<sid>\d+)\s+"
r"end_log_pos\s+(?P<end_pos>\d+)\s+"
r"(?P<type>\w+)")

def scan_logfile(master, logfile, on_gid):
from mysqlrep import Position
lines = fetch_remote_binlog(master, logfile)
# Scan the output to find global transaction ID update statements
for trans in group_by_trans(lines):
if len(trans) < 3:
continue
# Check for an update of the Last_Exec_Trans table
m = _GIDCRE.search(trans[-2])
if m:
server_id = int(m.group("server_id"))
trans_id = int(m.group("trans_id"))
# Check for an information comment with end_log_pos. We
# assume InnoDB tables only, so we can therefore rely on
# the transactions to end in an Xid event.
m = _HEADCRE.search(trans[-1])
if m and m.group("type") == "Xid":
pos = Position(server_id, logfile, int(m.group("end_pos")))
on_gid(server_id, trans_id, pos)


The code for the last step is given in Example 15. The promote_slave function takes a list of slaves that lost their master and executes the promotion by identifying the new master from the slaves. Finally, it reconnects all the other slaves to the promoted slave by scanning the binary logs. The code uses the support function fetch_ global_trans_id to fetch the global transaction ID from the table that we introduced.

Example 15. Identifying the new master and reconnecting all slaves to it
def fetch_global_trans_id(slave):
result = slave.sql("SELECT server_id, trans_id FROM Last_Exec_Trans")
return (int(result["server_id"]), int(result["trans_id"]))

def promote_slave(slaves):
slave_info = {}

# Collect the global transaction ID of each slave
for slave in slaves:
slave.connect()
server_id, trans_id = fetch_global_trans_id(slave)
slave_info.setdefault(trans_id, []).append((server_id, trans_id, slave))
slave.disconnect()

# Pick the slave to promote by taking the slave with the highest
# global transaction id.
new_master = slave_info[max(slave_info)].pop()[2]

def maybe_change_master(server_id, trans_id, position):
from mysqlrep.utility import change_master
try:
for sid, tid, slave in slave_info[trans_id]:
if slave is not new_master:
change_master(slave, new_master, position)
except KeyError:
pass

# Read the the master logfiles of the new master.
new_master.connect()
logs = [row["Log_name"] for row in new_master.sql("SHOW MASTER LOGS")]
new_master.disconnect()

# Read the master logfiles one by one in reverse order, the
# latest binlog file first.
logs.reverse()
for log in logs:
scan_logfile(new_master, log, maybe_change_master)


Worth noting in the code is that the slaves are collected into a dictionary using the transaction ID from the global transaction ID as a key.


Note:

With the code in Example 15, there is no guarantee that the transaction IDs will be in order, so if that is important, you will have to take additional measures. The transaction IDs could be in a nonsequential order if one transaction fetches a global transaction ID but is interrupted before it can commit another transaction, which fetches a global transaction ID and commits. To make sure the transaction IDs reflect the order in which transactions start, add a SELECT ... FOR UPDATE just before fetching a global transaction ID by changing the code as follows:

def commit_trans(cur):
cur.execute("SELECT * FROM Last_Exec_Trans FOR UPDATE")
cur.execute("SET SQL_LOG_BIN = 0")
cur.execute("INSERT INTO Global_Trans_ID() VALUES ()")
.

.
cur.commit()

This will lock the row until the transaction is committed, but will also slow down the system some, which is wasteful if the ordering is not required.


Stored Procedures to Commit Transactions

The main approach shown in this article to sync servers is to implement the transaction commit procedure in the application, meaning that the application code needs to know table names and the intricacies of how to produce and manipulate the global transaction ID. Once you understand them, the complexities are not as much of a barrier as they might seem at first. Often, you can handle them with relative ease by creating functions in the application code that the application writer can call without having to know the details.

Another approach is to put the transaction commit logic in the database server by using stored procedures. Depending on the situation, this can sometimes be a better alternative. For example, the commit procedure can be changed without having to change the application code.

For this technique to work, it is necessary to put the transaction ID from the Global_Trans_ID table and the server ID into either a user-defined variable or a local variable in the stored routine. Depending on which approach you select, the query in the binary log will look a little different.

Using local variables is less likely to interfere with surrounding code since user-defined variables “leak” out from the stored procedure.

The procedure for committing a transaction will then be:

CREATE PROCEDURE commit_trans ()
SQL SECURITY DEFINER
BEGIN
DECLARE trans_id, server_id INT UNSIGNED;
SET SQL_LOG_BIN = 0;
INSERT INTO global_trans_id() values ();
SELECT LAST_INSERT_ID() INTO trans_id,
@@server_id INTO server_id;
SET SQL_LOG_BIN = 1;
INSERT INTO last_exec_trans(server_id, trans_id)
VALUES (server_id, trans_id);
COMMIT;
END

Committing a transaction from the application code is then simple:

CALL Commit_Trans();

Now the task remains of changing the procedure for scanning the binary log for the global transaction ID. So, how will a call to this function appear in the binary log? Well, a quick call to mysqlbinlog shows:

# at 1724
#091129 18:35:11 server id 1 end_log_pos 1899 Query thread_id=75
exec_time=0 error_code=0
SET TIMESTAMP=1259516111/*!*/;
INSERT INTO last_exec_trans(server_id, trans_id)
VALUES ( NAME_CONST('server_id',1), NAME_CONST('trans_id',13))
/*!*/;
# at 1899
#091129 18:35:11 server id 1 end_log_pos 1926 Xid = 1444
COMMIT/*!*/;

As you can see, both the server ID and the transaction ID are clearly visible in the output. How to match this statement using a regular expression is left as an exercise for the reader.

Other  
  •  My SQL : Replication for High Availability - Redundancy, Planning
  •  SQL Server 2012 : Exploring SQL CLR - Deployment (part 2) - Deploying Your Stored Procedures, Testing Your Stored Procedures
  •  SQL Server 2012 : Exploring SQL CLR - Deployment (part 1) - Deploying Your Assembly
  •  SQL Server 2012 : Exploring SQL CLR - Your First SQL CLR Stored Procedure, CLR Stored Procedures and Server-Side Data Access
  •  SQL Server 2012 : Exploring SQL CLR - Visual Studio/SQL Server Integration
  •  SQL Server 2008 : DBCC validation - Removing corruption
  •  SQL Server 2008 : Mirroring in action (part 3) - Suspending and resuming mirroring, Initiating failover
  •  SQL Server 2008 : Mirroring in action (part 2) - Monitoring database mirroring
  •  SQL Server 2008 : Mirroring in action (part 1) - Mirroring setup
  •  SQL Server 2008 : High availability with database mirroring - Failover options
  •  
    Top 10
    Free Mobile And Desktop Apps For Accessing Restricted Websites
    MASERATI QUATTROPORTE; DIESEL : Lure of Italian limos
    TOYOTA CAMRY 2; 2.5 : Camry now more comely
    KIA SORENTO 2.2CRDi : Fuel-sipping slugger
    How To Setup, Password Protect & Encrypt Wireless Internet Connection
    Emulate And Run iPad Apps On Windows, Mac OS X & Linux With iPadian
    Backup & Restore Game Progress From Any Game With SaveGameProgress
    Generate A Facebook Timeline Cover Using A Free App
    New App for Women ‘Remix’ Offers Fashion Advice & Style Tips
    SG50 Ferrari F12berlinetta : Prancing Horse for Lion City's 50th
    - Messages forwarded by Outlook rule go nowhere
    - Create and Deploy Windows 7 Image
    - How do I check to see if my exchange 2003 is an open relay? (not using a open relay tester tool online, but on the console)
    - Creating and using an unencrypted cookie in ASP.NET
    - Directories
    - Poor Performance on Sharepoint 2010 Server
    - SBS 2008 ~ The e-mail alias already exists...
    - Public to Private IP - DNS Changes
    - Send Email from Winform application
    - How to create a .mdb file from ms sql server database.......
    programming4us programming4us
    programming4us
     
     
    programming4us