首页 > 数据库 >POSTGRESQL中ETL、fdw的平行替换

POSTGRESQL中ETL、fdw的平行替换

时间:2024-01-13 19:32:31浏览次数:37  
标签:INSERT POSTGRESQL INTO db VALUES ETL fdw conn


01、简介

“ 在我前两次的文章中,说到postgresql对于python的支持,其实很多功能也就可以封装进入的postgresql数据库中去。比如fdw、etl等,本文将以此为叙述点,进行演示展示”

POSTGRESQL中ETL、fdw的平行替换_python

在postgresql数据库中fdw的支持,在创建和使用上都不上太方便,特别是fdw在用表级别关联的时候,性能会大大折扣,因为fdw的数据并不会落地到本地。所以我们可以利用postgresql对于python的支持,自行封装一个库对库的调度工具,将远端数据进行落地后再次使用。对于使用的便利性,读者可自行对比。

02、postgresql16.1的安装

安装依赖

yum install -y bison flex readline-devel zlib-devel zlib zlib-devel gcc  gcc-c++ openssl-devel python3  python3-devel libicu-devel ncurses-devel sqlite-devel tk-devel gcc make

添加用户

useradd postgres 
vim /etc/sudo

在101行以下添加以下内容

postgres ALL=(ALL)     NOPASSWD: ALL

进入官网找到链接,这里使用源码安装。

wget https://ftp.postgresql.org/pub/source/v16.1/postgresql-16.1.tar.gz

解压并进入解压目录

mv postgresql-16.1.tar.gz /home/postgres
 su - postgres 
 tar -zxf postgresql-16.1.tar.gz
 cd postgresql-16.1

这里编译python支持还是很重要。--with-python 自行构建plpython3u插件

./configure --prefix=/home/postgres/pg --with-openssl  --with-python

make && make install

编辑环境变量

cd 
vim .bash_profile

加入以下环境变量

export PATH=/home/postgres/pg/bin:$PATH 
export PGDATA=/home/postgres/pg/data

加载环境变量

source ~/.bash_profile

初始化数据库

initdb -D $PGDATA -U postgres -W 
(输入超级用户密码两次)
pg_ctl start 
pg_ctl status

进入数据库创建拓展

CREATE EXTENSION plpython3u CASCADE;

02、创建支持跨库访问的函数

首先下载python链接数据库所需module

postgres=# \! pip3 install -i https://mirrors.aliyun.com/pypi/simple/ cx_Oracle pyodbc pymysql --user 
Looking in indexes: https://mirrors.aliyun.com/pypi/simple/
Requirement already satisfied: cx_Oracle in ./.local/lib/python3.6/site-packages (8.3.0)
Collecting pyodbc
  Downloading https://mirrors.aliyun.com/pypi/packages/27/5c/5e472d714dea2a634bd79df6b8ace55737a9f50c8fbb3b15521fceda4694/pyodbc-4.0.39-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (330 kB)
     |████████████████████████████████| 330 kB 2.8 MB/s            
Collecting pymysql
  Downloading https://mirrors.aliyun.com/pypi/packages/4f/52/a115fe175028b058df353c5a3d5290b71514a83f67078a6482cff24d6137/PyMySQL-1.0.2-py3-none-any.whl (43 kB)
     |████████████████████████████████| 43 kB 2.4 MB/s             
Installing collected packages: pyodbc, pymysql
Successfully installed pymysql-1.0.2 pyodbc-4.0.39

在链接远程Oracle数据库,需要下载指定的客户端,本文使用的是oracle 19C

wget https://download.oracle.com/otn_software/linux/instantclient/1921000/oracle-instantclient19.21-basic-19.21.0.0.0-1.x86_64.rpm
sudo rpm -ivh oracle-instantclient19.21-basic-19.21.0.0.0-1.x86_64.rpm

编辑环境变量

vim /etc/profile

配置以下环境变量值

export LD_LIBRARY_PATH=/usr/lib/oracle/19.21/client64/lib:$LD_LIBRARY_PATH

加载环境变量

source /etc/profile

在postgresql数据库中创建具有跨库链接mysql\oracle\sqlserver功能的function。

CREATE OR REPLACE FUNCTION fdw_db(db_type varchar(100),host VARCHAR(100),port integer, username VARCHAR(100), password VARCHAR(100), db_name VARCHAR(100),tablename varchar(100))
RETURNS text AS $$

import cx_Oracle
import pyodbc
import pymysql

def read_data_from_database(db_type, host, port, username, password, db_name, table_name):
    result_values = []  # Initialize as an empty list

    # 读取Oracle数据库中指定表数据的函数
    if db_type.lower() == 'oracle':
        connection_string = f"{username}/{password}@{host}:{port}/{db_name}"
        connection = cx_Oracle.connect(connection_string)
        cursor = connection.cursor()
        cursor.execute(f'SELECT * FROM {table_name}')
        result = cursor.fetchall()
        cursor.close()
        connection.close()

        # 将结果转换为支持INSERT INTO的VALUES语句
        for row in result:
            values_str = ', '.join([f"'{value}'" if isinstance(value, str) else str(value) for value in row])
            result_values.append(f'({values_str})')

    # 读取SQL Server数据库中指定表数据的函数
    elif db_type.lower() == 'sqlserver':
        connection = pyodbc.connect(f"DRIVER={{SQL Server}};SERVER={host};port={port};DATABASE={db_name};UID={username};PWD={password}")
        cursor = connection.cursor()
        cursor.execute(f'SELECT * FROM {table_name}')
        result = cursor.fetchall()
        cursor.close()
        connection.close()

        # 将结果转换为支持INSERT INTO的VALUES语句
        for row in result:
            values_str = ', '.join([f"'{value}'" if isinstance(value, str) else str(value) for value in row])
            result_values.append(f'({values_str})')

    # 读取MySQL数据库中指定表数据的函数
    elif db_type.lower() == 'mysql':
        connection = pymysql.connect(host=host, user=username, password=password, database=db_name, port=port)
        cursor = connection.cursor()
        cursor.execute(f'SELECT * FROM {table_name}')
        result = cursor.fetchall()
        cursor.close()
        connection.close()

        # 将结果转换为支持INSERT INTO的VALUES语句
        for row in result:
            values_str = ', '.join([f"'{value}'" if isinstance(value, str) else str(value) for value in row])
            result_values.append(f'({values_str})')

    else:
        raise ValueError("Unsupported database type. Supported types: 'oracle', 'sqlserver', 'mysql'")

    # 返回拼接的VALUES子句
    return ', '.join(result_values)

insert_values = read_data_from_database(db_type, host, port, username, password, db_name, tablename)
return insert_values


$$ LANGUAGE plpython3u;

以Oracle作为测试 在Oracle 和PG中均创建测试表conn_fdw postgresql

-- 创建表 conn_fdw
CREATE TABLE conn_fdw (
    id integer,
    name VARCHAR(50),
    age integer,
    city VARCHAR(50),
    salary integer
);

oracle中

-- 创建表 conn_fdw
CREATE TABLE conn_fdw (
    id NUMBER,
    name VARCHAR2(50),
    age NUMBER,
    city VARCHAR2(50),
    salary NUMBER
);

Oracle中插入数据

-- 插入20行数据
INSERT INTO conn_fdw VALUES (1, 'John', 30, 'New York', 50000);
INSERT INTO conn_fdw VALUES (2, 'Alice', 25, 'Los Angeles', 60000);
INSERT INTO conn_fdw VALUES (3, 'Bob', 35, 'Chicago', 70000);
INSERT INTO conn_fdw VALUES (4, 'Eva', 28, 'San Francisco', 55000);
INSERT INTO conn_fdw VALUES (5, 'Mike', 32, 'Seattle', 65000);
INSERT INTO conn_fdw VALUES (6, 'Sophia', 29, 'Boston', 75000);
INSERT INTO conn_fdw VALUES (7, 'David', 27, 'Denver', 52000);
INSERT INTO conn_fdw VALUES (8, 'Emily', 31, 'Austin', 68000);
INSERT INTO conn_fdw VALUES (9, 'Daniel', 26, 'Phoenix', 58000);
INSERT INTO conn_fdw VALUES (10, 'Olivia', 33, 'Houston', 72000);
INSERT INTO conn_fdw VALUES (11, 'Liam', 24, 'Portland', 49000);
INSERT INTO conn_fdw VALUES (12, 'Ava', 34, 'Atlanta', 71000);
INSERT INTO conn_fdw VALUES (13, 'Logan', 30, 'Miami', 62000);
INSERT INTO conn_fdw VALUES (14, 'Mia', 28, 'Dallas', 54000);
INSERT INTO conn_fdw VALUES (15, 'Jackson', 29, 'Minneapolis', 67000);
INSERT INTO conn_fdw VALUES (16, 'Sophie', 31, 'Detroit', 59000);
INSERT INTO conn_fdw VALUES (17, 'William', 27, 'Philadelphia', 70000);
INSERT INTO conn_fdw VALUES (18, 'Emma', 32, 'San Diego', 66000);
INSERT INTO conn_fdw VALUES (19, 'James', 26, 'Raleigh', 63000);
INSERT INTO conn_fdw VALUES (20, 'Avery', 35, 'Tampa', 71000);

此时再结合SQL语言进行处理远程连接传过来数据,再创建一个函数用于调用以上创建fdw_db

CREATE OR REPLACE FUNCTION inset_fdw_db(db_type varchar(100),host VARCHAR(100)
								  ,port integer, username VARCHAR(100), 
								  password VARCHAR(100), db_name VARCHAR(100),
								  tablename varchar(100),target_bale varchar(100))
RETURNS void AS $$
declare 
data_values text;
begin 
SELECT   fdw_db(db_type, host, port, username, password, db_name,tablename) into data_values;
 
EXECUTE 'insert into '||target_bale ||' values'||data_values;
end;

$$ LANGUAGE plpgsql;

进行调用

SELECT   inset_fdw_db('oracle', '192.168.48.1', 1521, 'system', 'system', 'orcl', 'CONN_FDW','public.conn_fdw');

进入数据库中查看 此时数据已经落地

postgres=# select *  from CONN_FDW;
 id | name | age | city | salary 
----+------+-----+------+--------
(0 rows)

postgres=#  SELECT   inset_fdw_db('oracle', '192.168.48.1', 1521, 'system', 'system', 'orcl', 'CONN_FDW','public.conn_fdw');
 inset_fdw_db 
--------------
 
(1 row)


postgres=# select *  from CONN_FDW;
 id |  name   | age |     city      | salary 
----+---------+-----+---------------+--------
  1 | John    |  30 | New York      |  50000
  2 | Alice   |  25 | Los Angeles   |  60000
  3 | Bob     |  35 | Chicago       |  70000
  4 | Eva     |  28 | San Francisco |  55000
  5 | Mike    |  32 | Seattle       |  65000
  6 | Sophia  |  29 | Boston        |  75000
  7 | David   |  27 | Denver        |  52000
  8 | Emily   |  31 | Austin        |  68000
  9 | Daniel  |  26 | Phoenix       |  58000
 10 | Olivia  |  33 | Houston       |  72000
 11 | Liam    |  24 | Portland      |  49000
 12 | Ava     |  34 | Atlanta       |  71000
 13 | Logan   |  30 | Miami         |  62000
 14 | Mia     |  28 | Dallas        |  54000
 15 | Jackson |  29 | Minneapolis   |  67000
 16 | Sophie  |  31 | Detroit       |  59000
 17 | William |  27 | Philadelphia  |  70000
 18 | Emma    |  32 | San Diego     |  66000
 19 | James   |  26 | Raleigh       |  63000
 20 | Avery   |  35 | Tampa         |  71000
(20 rows)

总结

该方法不仅可以应用到数据库对数据库之间,也可以应到,数据库对文件路径下。在postgresql嵌入python代码 其实可以替换掉一些中间件的使用。可控性,定制性也会更强。

标签:INSERT,POSTGRESQL,INTO,db,VALUES,ETL,fdw,conn
From: https://blog.51cto.com/u_16385176/9233023

相关文章

  • 【ubantu22.10】安装部署timescaledbv2.13.0及postgresql v14.10
    一、安装部署postgresql-timescaledbaptinstallgnupgpostgresql-commonapt-transport-httpslsb-releasewget二、运行postgresql存储库设置脚本/usr/share/postgresql-common/pgdg/apt.postgresql.org.sh三、添加timescaledb第三方存储库echo"debhttps://packageclo......
  • 典型的ETL使用场景​
    ETL(Extract,Transform,Load)是一种用于数据集成和数据转换的常用技术。它主要用于从多个数据源中提取数据,对数据进行清洗、转换和整合,最后加载到目标系统中。ETL的使用场景非常广泛,下面将介绍一些常见的ETL使用场景。数据仓库构建ETL技术可以帮助企业从不同的数据源中提取数据,并......
  • postgresql distinct on用法
    除了我们熟知的distinct外,postgresql还支持distincton,它的用途跟mysql没有启用ONLY_FULL_GROUP_BYSQL选项是一样的,针对声明的字段做分组,分组外的字段返回随机一行。如下:lightdb@oradb=#createtablet_distincton(idint,vtext);CREATETABLElightdb@oradb=#lightdb@ora......
  • PGSQL(PostgreSQL)数据库基础篇:PostgreSQL 的 主要优点 、 劣势 、体系结构 、核心功
    PostgreSQL的主要优点1.维护者是PostgreSQLGlobalDevelopmentGroup,首次发布于1989年6月。2.操作系统支持WINDOWS、Linux、UNIX、MACOSX、BSD。3.从基本功能上来看,支持ACID、关联完整性、数据库事务、Unicode多国语言。4.表和视图方面,PostgreSQL支持临时表,而物化视图,可以......
  • postgresql使用sql封装邮箱发送功能
    “ postgresql数据库支持python语言,同样可以封装一下python的邮件发送功能。        本文假设数据库中存有职员的花名册,对花名册进行遍历发送邮件。”01—数据库安装安装依赖yuminstall-ybisonflexreadline-develzlib-develzlibzlib-develgccgcc-c++openssl-d......
  • 在postgresql中用SQL封装python
    “ 对于一个数据库来说,SQL大家肯定最熟悉不过了。但是作为数据库开发者,我认为数据库不应该只是支持SQL语言。应该支持更多编程语言。比如python、java、c++等更多编程语言,让数据库在多种语言之间的管理、功能上实现最大便捷,这才是未来数据库最大的发展趋势。也是国产数据库在实现......
  • 关于腾讯地图geolocation.getLocation 经常定位失败,定位时间过长的解决方法
    今天遇到个项目,腾讯地图定位出现问题,导致地图无法呈现出最近的目标 这是正常的效果,之前一直出现贵州等地点的信息,查看控制台的网络后,发现腾讯的定位失败,要么就是定位时间过长,要20S左右,但是换EDGE浏览器却能正常加载,除了EDGE浏览器都会出现这个问题。随后我去查阅腾讯地图接口......
  • TDSQL(PostgreSQL版本) benchmark性能测试
    一、准备软件包jdk:地址:https://pan.baidu.com/s/1sbgLPROfd9e_valSfv0YAQ提取码:4qpsbenchmark:地址:https://pan.baidu.com/s/1nAHER-BXpgG0LUnR8NbT7Q提取码:xcbu二、安装1、jdk安装:1.1解压安装到34机器的/data/tbase/jdk目录下1.2配置环境变量如下:[tbase@VM_0_37_cent......
  • PostgreSQL 数据库归档最近被问及的问题问题 与 4 毋 处世学
    还是老规矩,技术加生活,先说技术,后说生活的感悟和人生的学习。在PostgreSQL中很少被提及的一个问题,归档,而这里经常有人问这个问题,所以需要写一期来说说关于ARCHIVE的问题。首先我们需要提出几个问题,1为什么要归档,PG中归档了什么2 什么时间进行归档,归档的原理与频率3  要怎么在......
  • 基于ETLCloud的MySQL到SqlServer实时同步解决方案
    背景在以下场景下会用到不同数据库的实时同步问题,比如:数据备份与容灾、多地域数据同步、数据共享与协作、数据分析与报表生成、实时监控与报警系统等等。大多数情况用到的就是数据备份了吧,相同的数据库还好,不同的数据库不能完全兼容就很麻烦,所以会用到SymmetricDS、Maxwell、Debezi......