(underconstruction)
Hive merge query for History strategy
Create an external table for intermadiate updates from extracted data and load jobs
CREATE EXTERNAL TABLE staging.update_contributors (
login string,
email string,
nome string,
idcentrocusto string,
nmcentrocusto string,
idempresa string,
nmempresa string,
idunidade string,
nmunidade string,
nucargahoraria string,
nmcargo string,
nmperfil string,
dtadmissao string,
dtdemissao string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;
LOAD DATA INPATH '/user/pig/serp' OVERWRITE INTO TABLE staging.update_contributors;
Create destiny table for results after merge action
CREATE TABLE IF NOT EXISTS staging.contributors
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS ORC
tblproperties ('transactional' = 'true', 'orc.compression' = 'ZLIB')
AS (SELECT update_contributors.*, current_timestamp as load_date, FALSE as archived FROM staging.update_contributors);
Create temporary table for manage actions and auxiliary decisions when upserts is necessary
CREATE TEMPORARY TABLE staging.diff_contributors
STORED AS ORC
TBLPROPERTIES ("auto.purge"="true")
AS (
SELECT
update_contributors.*,
CASE
WHEN (update_contributors.login <> contributors.login
OR update_contributors.nome <> contributors.nome
OR update_contributors.idcentrocusto <> contributors.idcentrocusto
OR update_contributors.nmcentrocusto <> contributors.nmcentrocusto
OR update_contributors.idempresa <> contributors.idempresa
OR update_contributors.nmempresa <> contributors.nmempresa
OR update_contributors.idunidade <> contributors.idunidade
OR update_contributors.nmunidade <> contributors.nmunidade
OR update_contributors.nucargahoraria <> contributors.nucargahoraria
OR update_contributors.nmcargo <> contributors.nmcargo
OR update_contributors.nmperfil <> contributors.nmperfil
OR update_contributors.dtadmissao <> contributors.dtadmissao
OR update_contributors.dtdemissao <> contributors.dtdemissao) THEN TRUE
ELSE FALSE
END AS changed
FROM staging.contributors, staging.update_contributors
WHERE contributors.login = update_contributors.login
AND NOT contributors.archived
);
Perform the merge between table of update data and the historical records (tez engine, is required because spark dont need support for merge clause)
MERGE INTO staging.contributors c USING (
SELECT *, FALSE AS to_filed FROM staging.diff_contributors
UNION ALL
SELECT *, TRUE AS to_filed FROM staging.diff_contributors WHERE changed
)
AS updates ON updates.login = c.login AND NOT c.archived
WHEN matched AND updates.changed AND updates.to_filed
THEN UPDATE SET archived = TRUE
WHEN matched AND updates.changed AND NOT updates.to_filed THEN INSERT VALUES (
updates.login, updates.email, updates.nome, updates.idcentrocusto, updates.nmcentrocusto, updates.idempresa,
updates.nmempresa, updates.idunidade, updates.nmunidade, updates.nucargahoraria, updates.nmcargo, updates.nmperfil, updates.dtadmissao,
updates.dtdemissao, current_timestamp, FALSE)
WHEN NOT matched THEN INSERT VALUES (
updates.login, updates.email, updates.nome, updates.idcentrocusto, updates.nmcentrocusto, updates.idempresa,
updates.nmempresa, updates.idunidade, updates.nmunidade, updates.nucargahoraria, updates.nmcargo, updates.nmperfil, updates.dtadmissao,
updates.dtdemissao, current_timestamp, FALSE);