mirror of
https://github.com/VCMP-SqMod/SqMod.git
synced 2025-01-19 12:07:13 +01:00
514 lines
15 KiB
C
514 lines
15 KiB
C
/************************************************************************************
|
|
Copyright (C) 2018 MariaDB Corpoeation AB
|
|
|
|
This library is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU Library General Public
|
|
License as published by the Free Software Foundation; either
|
|
version 2 of the License, or (at your option) any later version.
|
|
|
|
This library is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
Library General Public License for more details.
|
|
|
|
You should have received a copy of the GNU Library General Public
|
|
License along with this library; if not see <http://www.gnu.org/licenses>
|
|
or write to the Free Software Foundation, Inc.,
|
|
51 Franklin St., Fifth Floor, Boston, MA 02110, USA
|
|
|
|
*************************************************************************************/
|
|
|
|
#include <ma_global.h>
|
|
#include <ma_sys.h>
|
|
#include <mysql.h>
|
|
#include <errmsg.h>
|
|
#include <stdlib.h>
|
|
#include <string.h>
|
|
#include <stdarg.h>
|
|
#include <zlib.h>
|
|
#include <mariadb_rpl.h>
|
|
|
|
static int rpl_alloc_string(MARIADB_RPL_EVENT *event,
|
|
MARIADB_STRING *s,
|
|
unsigned char *buffer,
|
|
size_t len)
|
|
{
|
|
if (!(s->str= ma_alloc_root(&event->memroot, len)))
|
|
return 1;
|
|
memcpy(s->str, buffer, len);
|
|
s->length= len;
|
|
return 0;
|
|
}
|
|
|
|
MARIADB_RPL * STDCALL mariadb_rpl_init_ex(MYSQL *mysql, unsigned int version)
|
|
{
|
|
MARIADB_RPL *rpl;
|
|
|
|
if (version < MARIADB_RPL_REQUIRED_VERSION ||
|
|
version > MARIADB_RPL_VERSION)
|
|
{
|
|
my_set_error(mysql, CR_VERSION_MISMATCH, SQLSTATE_UNKNOWN, 0, version,
|
|
MARIADB_RPL_VERSION, MARIADB_RPL_REQUIRED_VERSION);
|
|
return 0;
|
|
}
|
|
|
|
if (!mysql)
|
|
return NULL;
|
|
|
|
if (!(rpl= (MARIADB_RPL *)calloc(1, sizeof(MARIADB_RPL))))
|
|
{
|
|
SET_CLIENT_ERROR(mysql, CR_OUT_OF_MEMORY, SQLSTATE_UNKNOWN, 0);
|
|
return 0;
|
|
}
|
|
rpl->version= version;
|
|
rpl->mysql= mysql;
|
|
return rpl;
|
|
}
|
|
|
|
void STDCALL mariadb_free_rpl_event(MARIADB_RPL_EVENT *event)
|
|
{
|
|
if (event)
|
|
{
|
|
ma_free_root(&event->memroot, MYF(0));
|
|
free(event);
|
|
}
|
|
}
|
|
|
|
int STDCALL mariadb_rpl_open(MARIADB_RPL *rpl)
|
|
{
|
|
unsigned char *ptr, *buf;
|
|
if (!rpl || !rpl->mysql)
|
|
return 1;
|
|
|
|
/* COM_BINLOG_DUMP:
|
|
Ofs Len Data
|
|
0 1 COM_BINLOG_DUMP
|
|
1 4 position
|
|
5 2 flags
|
|
7 4 server id
|
|
11 * filename
|
|
|
|
* = filename length
|
|
|
|
*/
|
|
ptr= buf=
|
|
#ifdef WIN32
|
|
(unsigned char *)_alloca(rpl->filename_length + 11);
|
|
#else
|
|
(unsigned char *)alloca(rpl->filename_length + 11);
|
|
#endif
|
|
|
|
int4store(ptr, (unsigned int)rpl->start_position);
|
|
ptr+= 4;
|
|
int2store(ptr, rpl->flags);
|
|
ptr+= 2;
|
|
int4store(ptr, rpl->server_id);
|
|
ptr+= 4;
|
|
memcpy(ptr, rpl->filename, rpl->filename_length);
|
|
ptr+= rpl->filename_length;
|
|
|
|
if (ma_simple_command(rpl->mysql, COM_BINLOG_DUMP, (const char *)buf, ptr - buf, 1, 0))
|
|
return 1;
|
|
return 0;
|
|
}
|
|
|
|
MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVENT *event)
|
|
{
|
|
unsigned char *ev;
|
|
size_t len;
|
|
MARIADB_RPL_EVENT *rpl_event= 0;
|
|
|
|
if (!rpl || !rpl->mysql)
|
|
return 0;
|
|
|
|
while (1) {
|
|
unsigned long pkt_len= ma_net_safe_read(rpl->mysql);
|
|
|
|
if (pkt_len == packet_error)
|
|
{
|
|
rpl->buffer_size= 0;
|
|
return 0;
|
|
}
|
|
|
|
/* EOF packet:
|
|
see https://mariadb.com/kb/en/library/eof_packet/
|
|
Packet length must be less than 9 bytes, EOF header
|
|
is 0xFE.
|
|
*/
|
|
if (pkt_len < 9 && rpl->mysql->net.read_pos[0] == 0xFE)
|
|
{
|
|
rpl->buffer_size= 0;
|
|
return 0;
|
|
}
|
|
|
|
/* if ignore heartbeat flag was set, we ignore this
|
|
record and continue to fetch next record.
|
|
The first byte is always status byte (0x00)
|
|
For event header description see
|
|
https://mariadb.com/kb/en/library/2-binlog-event-header/ */
|
|
if (rpl->flags & MARIADB_RPL_IGNORE_HEARTBEAT)
|
|
{
|
|
if (rpl->mysql->net.read_pos[1 + 4] == HEARTBEAT_LOG_EVENT)
|
|
continue;
|
|
}
|
|
|
|
rpl->buffer_size= pkt_len;
|
|
rpl->buffer= rpl->mysql->net.read_pos;
|
|
|
|
if (event)
|
|
{
|
|
MA_MEM_ROOT memroot= event->memroot;
|
|
rpl_event= event;
|
|
ma_free_root(&memroot, MYF(MY_KEEP_PREALLOC));
|
|
memset(rpl_event, 0, sizeof(MARIADB_RPL_EVENT));
|
|
rpl_event->memroot= memroot;
|
|
} else {
|
|
if (!(rpl_event = (MARIADB_RPL_EVENT *)malloc(sizeof(MARIADB_RPL_EVENT))))
|
|
goto mem_error;
|
|
memset(rpl_event, 0, sizeof(MARIADB_RPL_EVENT));
|
|
ma_init_alloc_root(&rpl_event->memroot, 8192, 0);
|
|
}
|
|
rpl_event->checksum= uint4korr(rpl->buffer + rpl->buffer_size - 4);
|
|
|
|
rpl_event->ok= rpl->buffer[0];
|
|
rpl_event->timestamp= uint4korr(rpl->buffer + 1);
|
|
rpl_event->event_type= (unsigned char)*(rpl->buffer + 5);
|
|
rpl_event->server_id= uint4korr(rpl->buffer + 6);
|
|
rpl_event->event_length= uint4korr(rpl->buffer + 10);
|
|
rpl_event->next_event_pos= uint4korr(rpl->buffer + 14);
|
|
rpl_event->flags= uint2korr(rpl->buffer + 18);
|
|
|
|
ev= rpl->buffer + EVENT_HEADER_OFS;
|
|
|
|
if (rpl->use_checksum)
|
|
{
|
|
rpl_event->checksum= *(ev + rpl_event->event_length - 4);
|
|
rpl_event->event_length-= 4;
|
|
}
|
|
|
|
switch(rpl_event->event_type) {
|
|
case HEARTBEAT_LOG_EVENT:
|
|
rpl_event->event.heartbeat.timestamp= uint4korr(ev);
|
|
ev+= 4;
|
|
rpl_event->event.heartbeat.next_position= uint4korr(ev);
|
|
ev+= 4;
|
|
rpl_event->event.heartbeat.type= (uint8_t)*ev;
|
|
ev+= 1;
|
|
rpl_event->event.heartbeat.flags= uint2korr(ev);
|
|
break;
|
|
case BINLOG_CHECKPOINT_EVENT:
|
|
len= uint4korr(ev);
|
|
ev+= 4;
|
|
if (rpl_alloc_string(rpl_event, &rpl_event->event.checkpoint.filename, ev, len))
|
|
goto mem_error;
|
|
break;
|
|
case FORMAT_DESCRIPTION_EVENT:
|
|
rpl_event->event.format_description.format = uint2korr(ev);
|
|
ev+= 2;
|
|
rpl_event->event.format_description.server_version = (char *)(ev);
|
|
ev+= 50;
|
|
rpl_event->event.format_description.timestamp= uint4korr(ev);
|
|
ev+= 4;
|
|
rpl->fd_header_len= rpl_event->event.format_description.header_len= (uint8_t)*ev;
|
|
ev= rpl->buffer + rpl->buffer_size - 5;
|
|
rpl->use_checksum= *ev;
|
|
break;
|
|
case QUERY_EVENT:
|
|
{
|
|
size_t db_len, status_len;
|
|
rpl_event->event.query.thread_id= uint4korr(ev);
|
|
ev+= 4;
|
|
rpl_event->event.query.seconds= uint4korr(ev);
|
|
ev+= 4;
|
|
db_len= *ev;
|
|
ev++;
|
|
rpl_event->event.query.errornr= uint2korr(ev);
|
|
ev+= 2;
|
|
status_len= uint2korr(ev);
|
|
ev+= 2;
|
|
if (rpl_alloc_string(rpl_event, &rpl_event->event.query.status, ev, status_len))
|
|
goto mem_error;
|
|
ev+= status_len;
|
|
|
|
if (rpl_alloc_string(rpl_event, &rpl_event->event.query.database, ev, db_len))
|
|
goto mem_error;
|
|
ev+= db_len + 1; /* zero terminated */
|
|
|
|
/* calculate statement size: buffer + buffer_size - current_ofs (ev) - crc_size */
|
|
len= (size_t)(rpl->buffer + rpl->buffer_size - ev - (rpl->use_checksum ? 4 : 0));
|
|
if (rpl_alloc_string(rpl_event, &rpl_event->event.query.statement, ev, len))
|
|
goto mem_error;
|
|
break;
|
|
}
|
|
case TABLE_MAP_EVENT:
|
|
rpl_event->event.table_map.table_id= uint6korr(ev);
|
|
ev+= 8;
|
|
len= *ev;
|
|
ev++;
|
|
if (rpl_alloc_string(rpl_event, &rpl_event->event.table_map.database, ev, len))
|
|
goto mem_error;
|
|
ev+= len + 1;
|
|
len= *ev;
|
|
ev++;
|
|
if (rpl_alloc_string(rpl_event, &rpl_event->event.table_map.table, ev, len))
|
|
goto mem_error;
|
|
ev+= len + 1;
|
|
rpl_event->event.table_map.column_count= mysql_net_field_length(&ev);
|
|
len= rpl_event->event.table_map.column_count;
|
|
if (rpl_alloc_string(rpl_event, &rpl_event->event.table_map.column_types, ev, len))
|
|
goto mem_error;
|
|
ev+= len;
|
|
len= mysql_net_field_length(&ev);
|
|
if (rpl_alloc_string(rpl_event, &rpl_event->event.table_map.metadata, ev, len))
|
|
goto mem_error;
|
|
break;
|
|
case RAND_EVENT:
|
|
rpl_event->event.rand.first_seed= uint8korr(ev);
|
|
ev+= 8;
|
|
rpl_event->event.rand.second_seed= uint8korr(ev);
|
|
break;
|
|
case INTVAR_EVENT:
|
|
rpl_event->event.intvar.type= *ev;
|
|
ev++;
|
|
rpl_event->event.intvar.value= uint8korr(ev);
|
|
break;
|
|
case USER_VAR_EVENT:
|
|
len= uint4korr(ev);
|
|
ev+= 4;
|
|
if (rpl_alloc_string(rpl_event, &rpl_event->event.uservar.name, ev, len))
|
|
goto mem_error;
|
|
ev+= len;
|
|
if (!(rpl_event->event.uservar.is_null= (uint8)*ev))
|
|
{
|
|
ev++;
|
|
rpl_event->event.uservar.type= *ev;
|
|
ev++;
|
|
rpl_event->event.uservar.charset_nr= uint4korr(ev);
|
|
ev+= 4;
|
|
len= uint4korr(ev);
|
|
ev+= 4;
|
|
if (rpl_alloc_string(rpl_event, &rpl_event->event.uservar.value, ev, len))
|
|
goto mem_error;
|
|
ev+= len;
|
|
if ((unsigned long)(ev - rpl->buffer) < rpl->buffer_size)
|
|
rpl_event->event.uservar.flags= *ev;
|
|
}
|
|
break;
|
|
case START_ENCRYPTION_EVENT:
|
|
rpl_event->event.encryption.scheme= *ev;
|
|
ev++;
|
|
rpl_event->event.encryption.key_version= uint4korr(ev);
|
|
ev+= 4;
|
|
rpl_event->event.encryption.nonce= (char *)ev;
|
|
break;
|
|
case ANNOTATE_ROWS_EVENT:
|
|
len= (uint32)(rpl->buffer + rpl->buffer_size - (unsigned char *)ev - (rpl->use_checksum ? 4 : 0));
|
|
if (rpl_alloc_string(rpl_event, &rpl_event->event.annotate_rows.statement, ev, len))
|
|
goto mem_error;
|
|
break;
|
|
case ROTATE_EVENT:
|
|
rpl_event->event.rotate.position= uint8korr(ev);
|
|
ev+= 8;
|
|
len= rpl_event->event_length - rpl->fd_header_len - 8;
|
|
if (rpl_alloc_string(rpl_event, &rpl_event->event.rotate.filename, ev, len))
|
|
goto mem_error;
|
|
break;
|
|
case XID_EVENT:
|
|
rpl_event->event.xid.transaction_nr= uint8korr(ev);
|
|
break;
|
|
case STOP_EVENT:
|
|
/* nothing to do here */
|
|
break;
|
|
case GTID_EVENT:
|
|
rpl_event->event.gtid.sequence_nr= uint8korr(ev);
|
|
ev+= 8;
|
|
rpl_event->event.gtid.domain_id= uint4korr(ev);
|
|
ev+= 4;
|
|
rpl_event->event.gtid.flags= *ev;
|
|
ev++;
|
|
if (rpl_event->event.gtid.flags & FL_GROUP_COMMIT_ID)
|
|
rpl_event->event.gtid.commit_id= uint8korr(ev);
|
|
break;
|
|
case GTID_LIST_EVENT:
|
|
{
|
|
uint32 i;
|
|
rpl_event->event.gtid_list.gtid_cnt= uint4korr(ev);
|
|
ev++;
|
|
if (!(rpl_event->event.gtid_list.gtid= (MARIADB_GTID *)ma_alloc_root(&rpl_event->memroot, sizeof(MARIADB_GTID) * rpl_event->event.gtid_list.gtid_cnt)))
|
|
goto mem_error;
|
|
for (i=0; i < rpl_event->event.gtid_list.gtid_cnt; i++)
|
|
{
|
|
rpl_event->event.gtid_list.gtid[i].domain_id= uint4korr(ev);
|
|
ev+= 4;
|
|
rpl_event->event.gtid_list.gtid[i].server_id= uint4korr(ev);
|
|
ev+= 4;
|
|
rpl_event->event.gtid_list.gtid[i].sequence_nr= uint8korr(ev);
|
|
ev+= 8;
|
|
}
|
|
break;
|
|
}
|
|
case WRITE_ROWS_EVENT_V1:
|
|
case UPDATE_ROWS_EVENT_V1:
|
|
case DELETE_ROWS_EVENT_V1:
|
|
rpl_event->event.rows.type= rpl_event->event_type - WRITE_ROWS_EVENT_V1;
|
|
if (rpl->fd_header_len == 6)
|
|
{
|
|
rpl_event->event.rows.table_id= uint4korr(ev);
|
|
ev+= 4;
|
|
} else {
|
|
rpl_event->event.rows.table_id= uint6korr(ev);
|
|
ev+= 6;
|
|
}
|
|
rpl_event->event.rows.flags= uint2korr(ev);
|
|
ev+= 2;
|
|
len= rpl_event->event.rows.column_count= mysql_net_field_length(&ev);
|
|
if (!len)
|
|
break;
|
|
if (!(rpl_event->event.rows.column_bitmap =
|
|
(char *)ma_alloc_root(&rpl_event->memroot, (len + 7) / 8)))
|
|
goto mem_error;
|
|
memcpy(rpl_event->event.rows.column_bitmap, ev, (len + 7) / 8);
|
|
ev+= (len + 7) / 8;
|
|
if (rpl_event->event_type == UPDATE_ROWS_EVENT_V1)
|
|
{
|
|
if (!(rpl_event->event.rows.column_update_bitmap =
|
|
(char *)ma_alloc_root(&rpl_event->memroot, (len + 7) / 8)))
|
|
goto mem_error;
|
|
memcpy(rpl_event->event.rows.column_update_bitmap, ev, (len + 7) / 8);
|
|
ev+= (len + 7) / 8;
|
|
}
|
|
len= (rpl->buffer + rpl_event->event_length + EVENT_HEADER_OFS - rpl->fd_header_len) - ev;
|
|
if ((rpl_event->event.rows.row_data_size= len))
|
|
{
|
|
if (!(rpl_event->event.rows.row_data =
|
|
(char *)ma_alloc_root(&rpl_event->memroot, rpl_event->event.rows.row_data_size)))
|
|
goto mem_error;
|
|
memcpy(rpl_event->event.rows.row_data, ev, rpl_event->event.rows.row_data_size);
|
|
}
|
|
break;
|
|
default:
|
|
free(rpl_event);
|
|
return NULL;
|
|
break;
|
|
}
|
|
return rpl_event;
|
|
}
|
|
mem_error:
|
|
free(rpl_event);
|
|
SET_CLIENT_ERROR(rpl->mysql, CR_OUT_OF_MEMORY, SQLSTATE_UNKNOWN, 0);
|
|
return 0;
|
|
}
|
|
|
|
void STDCALL mariadb_rpl_close(MARIADB_RPL *rpl)
|
|
{
|
|
if (!rpl)
|
|
return;
|
|
if (rpl->filename)
|
|
free((void *)rpl->filename);
|
|
free(rpl);
|
|
return;
|
|
}
|
|
|
|
int mariadb_rpl_optionsv(MARIADB_RPL *rpl,
|
|
enum mariadb_rpl_option option,
|
|
...)
|
|
{
|
|
va_list ap;
|
|
int rc= 0;
|
|
|
|
if (!rpl)
|
|
return 1;
|
|
|
|
va_start(ap, option);
|
|
|
|
switch (option) {
|
|
case MARIADB_RPL_FILENAME:
|
|
{
|
|
char *arg1= va_arg(ap, char *);
|
|
rpl->filename_length= (uint32_t)va_arg(ap, size_t);
|
|
free((void *)rpl->filename);
|
|
rpl->filename= NULL;
|
|
if (rpl->filename_length)
|
|
{
|
|
rpl->filename= (char *)malloc(rpl->filename_length);
|
|
memcpy((void *)rpl->filename, arg1, rpl->filename_length);
|
|
}
|
|
else if (arg1)
|
|
{
|
|
rpl->filename= strdup((const char *)arg1);
|
|
rpl->filename_length= (uint32_t)strlen(rpl->filename);
|
|
}
|
|
break;
|
|
}
|
|
case MARIADB_RPL_SERVER_ID:
|
|
{
|
|
rpl->server_id= va_arg(ap, unsigned int);
|
|
break;
|
|
}
|
|
case MARIADB_RPL_FLAGS:
|
|
{
|
|
rpl->flags= va_arg(ap, unsigned int);
|
|
break;
|
|
}
|
|
case MARIADB_RPL_START:
|
|
{
|
|
rpl->start_position= va_arg(ap, unsigned long);
|
|
break;
|
|
}
|
|
default:
|
|
rc= -1;
|
|
goto end;
|
|
}
|
|
end:
|
|
va_end(ap);
|
|
return rc;
|
|
}
|
|
|
|
int mariadb_rpl_get_optionsv(MARIADB_RPL *rpl,
|
|
enum mariadb_rpl_option option,
|
|
...)
|
|
{
|
|
va_list ap;
|
|
|
|
if (!rpl)
|
|
return 1;
|
|
|
|
va_start(ap, option);
|
|
|
|
switch (option) {
|
|
case MARIADB_RPL_FILENAME:
|
|
{
|
|
const char **name= (const char **)va_arg(ap, char **);
|
|
size_t *len= (size_t*)va_arg(ap, size_t *);
|
|
|
|
*name= rpl->filename;
|
|
*len= rpl->filename_length;
|
|
break;
|
|
}
|
|
case MARIADB_RPL_SERVER_ID:
|
|
{
|
|
unsigned int *id= va_arg(ap, unsigned int *);
|
|
*id= rpl->server_id;
|
|
break;
|
|
}
|
|
case MARIADB_RPL_FLAGS:
|
|
{
|
|
unsigned int *flags= va_arg(ap, unsigned int *);
|
|
*flags= rpl->flags;
|
|
break;
|
|
}
|
|
case MARIADB_RPL_START:
|
|
{
|
|
unsigned long *start= va_arg(ap, unsigned long *);
|
|
*start= rpl->start_position;
|
|
break;
|
|
}
|
|
default:
|
|
va_end(ap);
|
|
return 1;
|
|
break;
|
|
}
|
|
va_end(ap);
|
|
return 0;
|
|
}
|