From 731abf3f3c5b1c81d4d52996961c9f60196eec3b Mon Sep 17 00:00:00 2001 From: Jan Wieck Date: Fri, 14 Dec 2012 13:24:39 -0500 Subject: [PATCH 1/2] Fix for bug282. There was a problem with the new COPY protocol when a data type or domain used in a replicated column is not in slon's search_path. SPI does not provide a mechanism to get the namespace name of a columns data type. Instead of adding explicit type casting to the apply queries and handing the data in as TEXT Datums, we now use the same technique that PL/pgSQL uses at at least since 8.3 and convert the TEXT datums into the requested data type ourselves. --- src/backend/slony1_funcs.c | 116 +++++++++++++++++++++++++++++++++---------- src/slon/remote_worker.c | 4 +- 2 files changed, 91 insertions(+), 29 deletions(-) diff --git a/src/backend/slony1_funcs.c b/src/backend/slony1_funcs.c index 2f3e3d4..18cf726 100644 --- a/src/backend/slony1_funcs.c +++ b/src/backend/slony1_funcs.c @@ -165,6 +165,10 @@ typedef struct apply_cache_entry struct apply_cache_entry *prev; struct apply_cache_entry *next; + FmgrInfo *finfo_input; + Oid *typioparam; + int32 *typmod; + #ifdef APPLY_CACHE_VERIFY char *verifyKey; int evicted; @@ -1289,6 +1293,20 @@ _Slony_I_logApply(PG_FUNCTION_ARGS) /* elog(NOTICE, "cache entry for %s NOT found", cacheKey); */ + /* + * Allocate memory for the function call info to cast + * all datums from TEXT to the required Datum type. + */ + oldContext = MemoryContextSwitchTo(applyCacheContext); + cacheEnt->finfo_input = (FmgrInfo *)palloc(sizeof(FmgrInfo) * (cmdargsn / 2)); + cacheEnt->typioparam = (Oid *)palloc(sizeof(Oid) * (cmdargsn / 2)); + cacheEnt->typmod = (int32 *)palloc(sizeof(int32) * (cmdargsn / 2)); + MemoryContextSwitchTo(oldContext); + + if (cacheEnt->finfo_input == NULL || cacheEnt->typioparam == NULL || + cacheEnt->typmod == NULL) + elog(ERROR, "Slony-I: out of memory in logApply()"); + #ifdef APPLY_CACHE_VERIFY /* @@ -1370,27 +1388,36 @@ _Slony_I_logApply(PG_FUNCTION_ARGS) */ for (i = 0; i < cmdargsn; i += 2) { - char *coltype; + int colnum; + Oid coltype; + Oid typinput; applyQueryIncrease(); /* - * Lookup the column data type in the target relation. + * Lookup the column data type in the target relation and + * remember everything we need to know later to + * cast TEXT to the correct column Datum. */ - coltype = SPI_gettype(target_rel->rd_att, - SPI_fnumber(target_rel->rd_att, querycolnames[i / 2])); - if (coltype == NULL) + colnum = SPI_fnumber(target_rel->rd_att, querycolnames[i / 2]); + coltype = SPI_gettypeid(target_rel->rd_att, colnum); + if (coltype == InvalidOid) elog(ERROR, "Slony-I: type lookup for column %s failed in logApply()", querycolnames[i / 2]); + getTypeInputInfo(coltype, &typinput, + &(cacheEnt->typioparam[i / 2])); + fmgr_info(typinput, &(cacheEnt->finfo_input[i / 2])); + cacheEnt->typmod[i / 2] = + target_rel->rd_att->attrs[colnum - 1]->atttypmod; /* * Add the parameter to the query string */ - sprintf(applyQueryPos, "%s$%d::%s", (i == 0) ? "" : ", ", - i / 2 + 1, coltype); + sprintf(applyQueryPos, "%s$%d", (i == 0) ? "" : ", ", + i / 2 + 1); applyQueryPos += strlen(applyQueryPos); - querytypes[i / 2] = TEXTOID; + querytypes[i / 2] = coltype; } /* @@ -1423,22 +1450,30 @@ _Slony_I_logApply(PG_FUNCTION_ARGS) for (i = 0; i < cmdargsn; i += 2) { char *colname; - char *coltype; + int colnum; + Oid coltype; + Oid typinput; applyQueryIncrease(); /* - * Get the column name and data type. + * Get the column name and data type as well as everything + * needed later to cast TEXT to the correct input Datum. */ if (cmdargsnulls[i]) elog(ERROR, "Slony-I: column name in log_cmdargs is NULL"); colname = DatumGetCString(DirectFunctionCall1( textout, cmdargs[i])); - coltype = SPI_gettype(target_rel->rd_att, - SPI_fnumber(target_rel->rd_att, colname)); - if (coltype == NULL) + colnum = SPI_fnumber(target_rel->rd_att, colname); + coltype = SPI_gettypeid(target_rel->rd_att, colnum); + if (coltype == InvalidOid) elog(ERROR, "Slony-I: type lookup for column %s failed in logApply()", colname); + getTypeInputInfo(coltype, &typinput, + &(cacheEnt->typioparam[i / 2])); + fmgr_info(typinput, &(cacheEnt->finfo_input[i / 2])); + cacheEnt->typmod[i / 2] = + target_rel->rd_att->attrs[colnum - 1]->atttypmod; /* * Special case if there were no columns updated. We tell @@ -1468,10 +1503,10 @@ _Slony_I_logApply(PG_FUNCTION_ARGS) * This is inside the SET clause. Add the = * $n:: separated by comma. */ - sprintf(applyQueryPos, "%s%s = $%d::%s", + sprintf(applyQueryPos, "%s%s = $%d", (i > 0) ? ", " : "", slon_quote_identifier(colname), - i / 2 + 1, coltype); + i / 2 + 1); } else { @@ -1479,14 +1514,14 @@ _Slony_I_logApply(PG_FUNCTION_ARGS) * This is in the WHERE clause. Same as above but * separated by AND. */ - sprintf(applyQueryPos, "%s%s = $%d::%s", + sprintf(applyQueryPos, "%s%s = $%d", (i > cmdupdncols * 2) ? " AND " : "", slon_quote_identifier(colname), - i / 2 + 1, coltype); + i / 2 + 1); } applyQueryPos += strlen(applyQueryPos); - querytypes[i / 2] = TEXTOID; + querytypes[i / 2] = coltype; } strcpy(applyQueryPos, ";"); @@ -1510,31 +1545,39 @@ _Slony_I_logApply(PG_FUNCTION_ARGS) for (i = 0; i < cmdargsn; i += 2) { + int colnum; char *colname; - char *coltype; + Oid coltype; + Oid typinput; applyQueryIncrease(); /* - * Add = $n:: separated by comma. + * Add = $n separated by comma. */ if (cmdargsnulls[i]) elog(ERROR, "Slony-I: column name in log_cmdargs is NULL"); colname = DatumGetCString(DirectFunctionCall1( textout, cmdargs[i])); - coltype = SPI_gettype(target_rel->rd_att, - SPI_fnumber(target_rel->rd_att, colname)); - if (coltype == NULL) + colnum = SPI_fnumber(target_rel->rd_att, colname); + coltype = SPI_gettypeid(target_rel->rd_att, colnum); + if (coltype == InvalidOid) elog(ERROR, "Slony-I: type lookup for column %s failed in logApply()", colname); - sprintf(applyQueryPos, "%s%s = $%d::%s", + getTypeInputInfo(coltype, &typinput, + &(cacheEnt->typioparam[i / 2])); + fmgr_info(typinput, &(cacheEnt->finfo_input[i / 2])); + cacheEnt->typmod[i / 2] = + target_rel->rd_att->attrs[colnum - 1]->atttypmod; + + sprintf(applyQueryPos, "%s%s = $%d", (i > 0) ? " AND " : "", slon_quote_identifier(colname), - i / 2 + 1, coltype); + i / 2 + 1); applyQueryPos += strlen(applyQueryPos); - querytypes[i / 2] = TEXTOID; + querytypes[i / 2] = coltype; } strcpy(applyQueryPos, ";"); @@ -1610,6 +1653,11 @@ _Slony_I_logApply(PG_FUNCTION_ARGS) apply_num_evict++; SPI_freeplan(evict->plan); + oldContext = MemoryContextSwitchTo(applyCacheContext); + pfree(cacheEnt->finfo_input); + pfree(cacheEnt->typioparam); + pfree(cacheEnt->typmod); + MemoryContextSwitchTo(oldContext); evict->plan = NULL; #ifdef APPLY_CACHE_VERIFY evict->evicted = 1; @@ -1670,11 +1718,25 @@ _Slony_I_logApply(PG_FUNCTION_ARGS) for (i = 0; i < cmdargsn; i += 2) { - queryvals[i / 2] = cmdargs[i + 1]; + char *tmpval; + if (cmdargsnulls[i + 1]) + { + queryvals[i / 2] = (Datum)0; querynulls[i / 2] = 'n'; + } else + { + tmpval = DatumGetCString(DirectFunctionCall1(textout, + cmdargs[i + 1])); + queryvals[i / 2] = InputFunctionCall( + &(cacheEnt->finfo_input[i / 2]), + tmpval, + cacheEnt->typioparam[i / 2], + cacheEnt->typmod[i / 2]); + pfree(tmpval); querynulls[i / 2] = ' '; + } } querynulls[cmdargsn / 2] = '\0'; diff --git a/src/slon/remote_worker.c b/src/slon/remote_worker.c index 69de825..4eefd56 100644 --- a/src/slon/remote_worker.c +++ b/src/slon/remote_worker.c @@ -4766,7 +4766,7 @@ sync_helper(void *cdata, PGconn *local_conn) } res = PQgetResult(dbconn); - if (PQresultStatus(res) < 0) + if (PQresultStatus(res) != PGRES_COMMAND_OK) { slon_log(SLON_ERROR, "remoteWorkerThread_%d_%d: error at end of COPY OUT: %s", node->no_id, provider->no_id, @@ -4776,7 +4776,7 @@ sync_helper(void *cdata, PGconn *local_conn) PQclear(res); res = PQgetResult(local_conn); - if (PQresultStatus(res) < 0) + if (PQresultStatus(res) != PGRES_COMMAND_OK) { slon_log(SLON_ERROR, "remoteWorkerThread_%d_%d: error at end of COPY IN: %s", node->no_id, provider->no_id, -- 1.7.0.4