应用接入指南

应用接入指南

muzimu217

2025-06-19 发布224 浏览 · 0 点赞 · 0 收藏

快速入门 文章中我们介绍了 OpenTenBase 的架构、源码编译安装、集群运行状态、启动停止等内容。

本篇将介绍应用程序如何连接 OpenTenBase 数据库进行建库、建表、数据导入、查询等操作。

OpenTenBase 兼容所有支持 Postgres 协议的客户端连接,这里将详细介绍 JAVA、C 语音、shell 语言、Python、PHP、Golang 这 6 种最常用的开发语言连接 OpenTenBase 的操作方法。

1、JAVA 开发


1.1、创建数据表

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;


public class createtable {
   public static void main( String args[] )
     {
       Connection c = null;
       Statement stmt = null;
       try {
         Class.forName("org.postgresql.Driver");
         c = DriverManager.getConnection("jdbc:postgresql://127.0.0.1:15432/postgres?currentSchema=public&binaryTransfer=false","opentenbase", "opentenbase");
         System.out.println("Opened database successfully");
         stmt = c.createStatement();
         String sql = "create table opentenbase(id int,nickname text) distribute by shard(id) to group default_group" ;
         stmt.executeUpdate(sql);
         stmt.close();
         c.close();
       } catch ( Exception e ) {
         System.err.println( e.getClass().getName()+": "+ e.getMessage() );
         System.exit(0);
       }
       System.out.println("Table created successfully");
     }
}

说明:

  • 这里连接的节点为任意 CN 主节点,后面所有操作,没特别说明,都是连接到 CN 主节点进行操作。

1.2、使用普通协议插入数据

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;

public class insert {
   public static void main(String args[]) {
      Connection c = null;
      Statement stmt = null;
      try {
         Class.forName("org.postgresql.Driver");
         c = DriverManager.getConnection("jdbc:postgresql://127.0.0.1:15432/postgres?currentSchema=public&binaryTransfer=false","opentenbase", "opentenbase");
         c.setAutoCommit(false);
         System.out.println("Opened database successfully");

         stmt = c.createStatement();
         String sql = "INSERT INTO opentenbase (id,nickname) "
               + "VALUES (1,'opentenbase');";
         stmt.executeUpdate(sql);

         sql = "INSERT INTO opentenbase (id,nickname) "
               + "VALUES (2, 'pgxz' ),(3,'pgxc');";
         stmt.executeUpdate(sql);
         stmt.close();
         c.commit();
         c.close();
      } catch (Exception e) {
         System.err.println( e.getClass().getName()+": "+ e.getMessage() );
         System.exit(0);
      }
      System.out.println("Records created successfully");
   }
}

1.3、使用扩展协议插入数据

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.*;
import java.util.Random;

public class insert_prepared {
   public static void main(String args[]) {
      Connection c = null;
      PreparedStatement stmt;
      try {
         Class.forName("org.postgresql.Driver");
         c = DriverManager.getConnection("jdbc:postgresql://127.0.0.1:15432/postgres?currentSchema=public&binaryTransfer=false","opentenbase", "opentenbase");
         c.setAutoCommit(false);
         System.out.println("Opened database successfully");
         //插入数据
         String sql = "INSERT INTO opentenbase (id,nickname) VALUES (?,?)";
         stmt = c.prepareStatement(sql);
         stmt.setInt(1, 9999);
         stmt.setString(2, "opentenbase_prepared");
         stmt.executeUpdate();

         //插入更新
         sql = "INSERT INTO opentenbase (id,nickname) VALUES (?,?) ON CONFLICT(id) DO UPDATE SET nickname=?";
         stmt = c.prepareStatement(sql);
         stmt.setInt(1, 9999);
         stmt.setString(2, "opentenbase_prepared");
         stmt.setString(3, "opentenbase_prepared_update");
         stmt.executeUpdate();

         stmt.close();
         c.commit();
         c.close();
      } catch (Exception e) {
         System.err.println( e.getClass().getName()+": "+ e.getMessage() );
         System.exit(0);
      }
      System.out.println("Records created successfully");
   }
}

1.4 COPY FROM 加载文件到表

此段代码通过 JDBC 使用 PostgreSQL 的 COPY 命令,将 CSV 文件中的数据导入到 OpenTenBase 数据库中的指定表。

示例代码

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import org.postgresql.copy.CopyManager;
import org.postgresql.core.BaseConnection;
import java.io.*;

public class copyfrom {
   public static void main( String args[] ) {
       Connection c = null;
       Statement stmt = null;
       FileInputStream fs = null;
       try {
           Class.forName("org.postgresql.Driver");
           c = DriverManager.getConnection("jdbc:postgresql://127.0.0.1:15432/postgres?currentSchema=public&binaryTransfer=false","opentenbase", "opentenbase");
           System.out.println("Opened database successfully");

           CopyManager cm = new CopyManager((BaseConnection) c);
           fs = new FileInputStream("/data/opentenbase/opentenbase.csv");
           String sql = "COPY opentenbase FROM STDIN DELIMITER AS ','";
           cm.copyIn(sql, fs);

           c.close();
           fs.close();
       } catch ( Exception e ) {
           System.err.println( e.getClass().getName()+": "+ e.getMessage() );
           System.exit(0);
       }
       System.out.println("Copy data successfully");
   }
}

说明:

  • JDBC URL: 连接到本地的 OpenTenBase CN 节点(默认端口为 15432)。
  • CSV 文件路径: /data/opentenbase/opentenbase.csv 是要导入的数据源。
  • COPY 命令: 使用 COPY table_name FROM STDIN 格式,并指定字段分隔符为逗号 ,
  • CopyManager: 使用 org.postgresql.copy.CopyManager 提供的 copyIn() 方法执行 COPY 操作。

1.5 COPY TO 导出数据到文件

此段代码演示如何使用 PostgreSQL 的 COPY TO STDOUT 命令,将数据库表中的数据导出到本地 CSV 文件。

示例代码

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import org.postgresql.copy.CopyManager;
import org.postgresql.core.BaseConnection;
import java.io.*;

public class copyto {
   public static void main( String args[] ) {
       Connection c = null;
       Statement stmt = null;
       FileOutputStream fs = null;
       try {
           Class.forName("org.postgresql.Driver");
           c = DriverManager.getConnection("jdbc:postgresql://127.0.0.1:15432/postgres?currentSchema=public&binaryTransfer=false","opentenbase", "opentenbase");
           System.out.println("Opened database successfully");

           CopyManager cm = new CopyManager((BaseConnection) c);
           fs = new FileOutputStream("/data/opentenbase/opentenbase.csv");
           String sql = "COPY opentenbase TO STDOUT DELIMITER AS ','";
           cm.copyOut(sql, fs);

           c.close();
           fs.close();
       } catch ( Exception e ) {
           System.err.println( e.getClass().getName()+": "+ e.getMessage() );
           System.exit(0);
       }
       System.out.println("Copy data successfully");
   }
}

说明:

  • 导出文件路径: /data/opentenbase/opentenbase.csv
  • COPY 命令: 使用 COPY table_name TO STDOUT 并指定字段分隔符为逗号 ,
  • CopyManager: 使用 copyOut() 方法将查询结果直接写入输出流。

1.6 JDBC 包下载地址

您可以从以下链接下载适用于 PostgreSQL 兼容的 JDBC 驱动包:

🔗 PostgreSQL JDBC 下载页面


2 C 程序开发

2.1 连接数据库

本节展示如何使用 C 语言连接 OpenTenBase 数据库。使用的是 PostgreSQL 官方客户端库 libpq

示例代码

#include <stdio.h>
#include <stdlib.h>
#include "libpq-fe.h"

int main(int argc, char **argv){
    const char *conninfo;
    PGconn     *conn;

    if (argc > 1)
        conninfo = argv[1];
    else
        conninfo = "dbname = postgres";

    conn = PQconnectdb(conninfo);

    if (PQstatus(conn) != CONNECTION_OK)
        fprintf(stderr, "连接数据库失败: %s",PQerrorMessage(conn));
    else
        printf("连接数据库成功!\n");

    PQfinish(conn);
    return 0;
}

编译命令

gcc -c -I /usr/local/install/opentenbase_pgxz/include/ conn.c
gcc -o conn conn.o -L /usr/local/install/opentenbase_pgxz/lib/ -lpq

执行示例

./conn "host=172.16.0.3 dbname=postgres port=11000"
连接数据库成功!

./conn "host=172.16.0.3 dbname=postgres port=15432 user=opentenbase"
连接数据库成功!

2.2 建立数据表

本节展示如何在 OpenTenBase 中创建一张表并设置分布策略。

示例代码

#include <stdio.h>
#include <stdlib.h>
#include "libpq-fe.h"

int main(int argc, char **argv){
    const char *conninfo;
    PGconn     *conn;
    PGresult   *res;
    const char *sql = "create table opentenbase(id int,nickname text) distribute by shard(id) to group default_group";

    if (argc > 1)
        conninfo = argv[1];
    else
        conninfo = "dbname = postgres";

    conn = PQconnectdb(conninfo);

    if (PQstatus(conn) != CONNECTION_OK)
        fprintf(stderr, "连接数据库失败: %s",PQerrorMessage(conn));
    else
        printf("连接数据库成功!\n");

    res = PQexec(conn, sql);

    if(PQresultStatus(res) != PGRES_COMMAND_OK)
        fprintf(stderr, "建立数据表失败: %s",PQresultErrorMessage(res));
    else
        printf("建立数据表成功!\n");

    PQclear(res);
    PQfinish(conn);
    return 0;
}

编译命令

gcc -c -I /usr/local/install/opentenbase_pgxz/include/ createtable.c
gcc -o createtable createtable.o -L /usr/local/install/opentenbase_pgxz/lib/ -lpq

执行示例

./createtable "port=11000 dbname=postgres"
连接数据库成功!
建立数据表成功!

2.3 插入数据

本节展示如何使用 C 语言向 OpenTenBase 数据库表中插入多条记录。

示例代码

#include <stdio.h>
#include <stdlib.h>
#include "libpq-fe.h"

int main(int argc, char **argv){
    const char *conninfo;
    PGconn     *conn;
    PGresult   *res;
    const char *sql = "INSERT INTO opentenbase (id,nickname) values(1,'opentenbase'),(2,'pgxz')";

    if (argc > 1)
        conninfo = argv[1];
    else
        conninfo = "dbname = postgres";

    conn = PQconnectdb(conninfo);

    if (PQstatus(conn) != CONNECTION_OK)
        fprintf(stderr, "连接数据库失败: %s", PQerrorMessage(conn));
    else
        printf("连接数据库成功!\n");

    res = PQexec(conn, sql);

    if(PQresultStatus(res) != PGRES_COMMAND_OK)
        fprintf(stderr, "插入数据失败: %s", PQresultErrorMessage(res));
    else
        printf("插入数据成功!\n");

    PQclear(res);
    PQfinish(conn);
    return 0;
}

编译命令

gcc -c -I /usr/local/install/opentenbase_pgxz/include/ insert.c
gcc -o insert insert.o -L /usr/local/install/opentenbase_pgxz/lib/ -lpq

执行示例

./insert "dbname=postgres port=15432"
连接数据库成功!
插入数据成功!

2.4 查询数据

本节演示如何使用 C 语言查询 OpenTenBase 数据库中的数据,并输出结果。

示例代码

#include <stdio.h>
#include <stdlib.h>
#include "libpq-fe.h"

int main(int argc, char **argv){
    const char *conninfo;
    PGconn     *conn;
    PGresult   *res;
    const char *sql = "select * from opentenbase";

    if (argc > 1)
        conninfo = argv[1];
    else
        conninfo = "dbname = postgres";

    conn = PQconnectdb(conninfo);

    if (PQstatus(conn) != CONNECTION_OK)
        fprintf(stderr, "连接数据库失败: %s", PQerrorMessage(conn));
    else
        printf("连接数据库成功!\n");

    res = PQexec(conn, sql);

    if(PQresultStatus(res) != PGRES_TUPLES_OK)
        fprintf(stderr, "查询数据失败: %s", PQresultErrorMessage(res));
    else {
        printf("查询数据成功!\n");
        int rownum = PQntuples(res);
        int colnum = PQnfields(res);

        for(int j = 0; j < colnum; ++j)
            printf("%s\t", PQfname(res, j));
        printf("\n");

        for(int i = 0; i < rownum; ++i) {
            for(int j = 0; j < colnum; ++j)
                printf("%s\t", PQgetvalue(res, i, j));
            printf("\n");
        }
    }

    PQclear(res);
    PQfinish(conn);
    return 0;
}

编译命令

gcc -std=c99 -c -I /usr/local/install/opentenbase_pgxz/include/ select.c
gcc -o select select.o -L /usr/local/install/opentenbase_pgxz/lib/ -lpq

执行示例

./select "dbname=postgres port=15432"
连接数据库成功!
查询数据成功!
id      nickname
1       opentenbase
2       pgxz

2.5 流式 COPY 入表

本节介绍如何使用 C 语言通过流式方式将数据直接写入数据库表。

示例代码

#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include "libpq-fe.h"

int main(int argc, char **argv){
    const char *conninfo;
    PGconn     *conn;
    PGresult   *res;
    const char *buffer = "1,opentenbase\n2,pgxz\n3,opentenbase牛";

    if (argc > 1)
        conninfo = argv[1];
    else
        conninfo = "dbname = postgres";

    conn = PQconnectdb(conninfo);

    if (PQstatus(conn) != CONNECTION_OK)
        fprintf(stderr, "连接数据库失败: %s", PQerrorMessage(conn));
    else
        printf("连接数据库成功!\n");

    res = PQexec(conn, "COPY opentenbase FROM STDIN DELIMITER ',';");
    if(PQresultStatus(res) != PGRES_COPY_IN)
        fprintf(stderr, "copy数据出错1: %s", PQresultErrorMessage(res));
    else {
        int len = strlen(buffer);
        if(PQputCopyData(conn, buffer, len) == 1) {
            if(PQputCopyEnd(conn, NULL) == 1) {
                res = PQgetResult(conn);
                if(PQresultStatus(res) == PGRES_COMMAND_OK)
                    printf("copy数据成功!\n");
                else
                    fprintf(stderr, "copy数据出错2: %s", PQerrorMessage(conn));
            } else {
                fprintf(stderr, "copy数据出错3: %s", PQerrorMessage(conn));
            }
        } else {
            fprintf(stderr, "copy数据出错4: %s", PQerrorMessage(conn));
        }
    }

    PQclear(res);
    PQfinish(conn);
    return 0;
}

编译命令

gcc -c -I /usr/local/install/opentenbase_pgxz/include/ copy.c
gcc -o copy copy.o -L /usr/local/install/opentenbase_pgxz/lib/ -lpq

执行示例

./copy "dbname=postgres port=15432"
连接数据库成功!
copy数据成功!

3 Shell 脚本开发

Shell 脚本可用于批量操作 OpenTenBase 的多个节点,例如执行 SQL、遍历所有 DN 节点等。

示例脚本

#!/bin/sh

if [ $# -ne 0 ]
then
    echo "usage: $0 exec_sql"
    exit 1
fi

exec_sql=$1

masters=`psql -h 172.16.0.29 -d postgres -p 15432 -t -c "select string_agg(node_host, ' ') from (select * from pgxc_node where node_type = 'D' order by node_name) t"`
port_list=`psql -h 172.16.0.29 -d postgres -p 15432 -t -c "select string_agg(node_port::text, ' ') from (select * from pgxc_node where node_type = 'D' order by node_name) t"`
node_cnt=`psql -h 172.16.0.29 -d postgres -p 15432 -t -c "select count(*) from pgxc_node where node_type = 'D'"`

masters=($masters)
ports=($port_list)

echo $node_cnt

flag=0

for((i=0;i<$node_cnt;i++));
do
    seq=$(($i+1))
    master=${masters[$i]}
    port=${ports[$i]}
    echo $master
    echo $port

    psql -h $master -p $port  postgres -c "$exec_sql"
done

该脚本可动态获取所有 DN 节点并执行指定 SQL 操作。


4 Python 程序开发

4.1 安装 psycopg2 模块

[root@VM_0_29_centos ~]# yum install python-psycopg2

4.2 连接数据库

# coding=utf-8
#!/usr/bin/python
import psycopg2

try:
    conn = psycopg2.connect(database="postgres", user="opentenbase", password="", host="172.16.0.29", port="15432")
    print "连接数据库成功"
    conn.close()
except psycopg2.Error as msg:
    print "连接数据库出错,错误详细信息: %s" % (msg.args[0])

执行示例

[opentenbase@VM_0_29_centos python]$ python conn.py
连接数据库成功

4.3 创建数据表

本节展示如何使用 Python 连接 OpenTenBase 并创建一张表。

示例代码

# coding=utf-8
#!/usr/bin/python
import psycopg2

try:
    conn = psycopg2.connect(database="postgres", user="opentenbase", password="", host="172.16.0.29", port="15432")
    print "连接数据库成功"
    cur = conn.cursor()
    sql = """
          create table opentenbase
          (
              id int,
              nickname varchar(100)
          ) distribute by shard(id) to group default_group
          """
    cur.execute(sql)
    conn.commit()
    print "建立数据表成功"
    conn.close()
except psycopg2.Error as msg:
    print "OpenTenBase Error %s" % (msg.args[0])

执行示例

[opentenbase@VM_0_29_centos python]$ python createtable.py
连接数据库成功
建立数据表成功

4.4 插入数据

本节演示如何使用 Python 向 OpenTenBase 表中插入单条或多条记录。

示例代码

# coding=utf-8
#!/usr/bin/python
import psycopg2

try:
    conn = psycopg2.connect(database="postgres", user="opentenbase", password="", host="172.16.0.29", port="15432")
    print "连接数据库成功"
    cur = conn.cursor()
    sql = "insert into opentenbase values(1,'opentenbase'),(2,'opentenbase');"
    cur.execute(sql)

    sql = "insert into opentenbase values(%s,%s)"
    cur.execute(sql, (3, 'pg'))
    conn.commit()
    print "插入数据成功"
    conn.close()
except psycopg2.Error as msg:
    print "操作数据库出错 %s" % (msg.args[0])

执行示例

[opentenbase@VM_0_29_centos python]$ python insert.py
连接数据库成功
插入数据成功

4.5 查询数据

本节演示如何使用 Python 查询 OpenTenBase 数据库中的数据并输出结果。

示例代码

# coding=utf-8
#!/usr/bin/python
import psycopg2

try:
    conn = psycopg2.connect(database="postgres", user="opentenbase", password="", host="172.16.0.29", port="15432")
    print "连接数据库成功"
    cur = conn.cursor()
    sql = "select * from opentenbase"
    cur.execute(sql)
    rows = cur.fetchall()

    for row in rows:
        print "ID = ", row[0]
        print "NICKNAME = ", row[1], "\n"

    conn.close()
except psycopg2.Error as msg:
    print "操作数据库出错 %s" % (msg.args[0])

执行示例

[opentenbase@VM_0_29_centos python]$ python select.py
连接数据库成功
ID =  1
NICKNAME =  opentenbase

ID =  2
NICKNAME =  pgxz

ID =  3
NICKNAME =  pg

4.6 使用 COPY 加载文件到表

本节展示如何使用 Python 将 CSV 文件导入到 OpenTenBase 表中。

示例代码

# coding=utf-8
#!/usr/bin/python
import psycopg2

try:
    conn = psycopg2.connect(database="postgres", user="opentenbase", password="", host="172.16.0.29", port="15432")
    print "连接数据库成功"
    cur = conn.cursor()
    filename = "/data/opentenbase/opentenbase.txt"
    cols = ('id', 'nickname')
    tablename = "public.opentenbase"

    cur.copy_from(file=open(filename), table=tablename, columns=cols, sep=',')
    conn.commit()
    print "导入数据成功"
    conn.close()
except psycopg2.Error as msg:
    print "操作数据库出错 %s" % (msg.args[0])

执行示例

[opentenbase@VM_0_29_centos python]$ python copy_from.py
连接数据库成功
导入数据成功

5 PHP 程序开发

5.1 连接数据库

本节演示如何使用 PHP 连接 OpenTenBase 数据库。

示例代码

<?php
$host = "172.16.0.29";
$port = "15432";
$dbname = "postgres";
$user = "opentenbase";
$password = "";

// 连接数据库
$conn = @pg_connect("host=$host port=$port dbname=$dbname user=$user password=$password");

if (!$conn) {
    $error_msg = @pg_errormessage($conn);
    echo "连接数据库出错,详情:" . $error_msg . "\n<BR>";
    exit;
} else {
    echo "连接数据库成功" . "\n<BR>";
}

// 关闭连接
pg_close($conn);
?>

执行方式

通过浏览器或命令行访问 PHP 页面:

[root@VM_0_47_centos test]# curl http://127.0.0.1:8080/dbsta/test/conn.php
连接数据库成功

5.2 创建数据表

本节展示如何在 OpenTenBase 中创建一张表并设置分布策略。

示例代码

<?php
$host = "172.16.0.29";
$port = "15432";
$dbname = "postgres";
$user = "opentenbase";
$password = "";

// 连接数据库
$conn = @pg_connect("host=$host port=$port dbname=$dbname user=$user password=$password");

if (!$conn) {
    $error_msg = @pg_errormessage($conn);
    echo "连接数据库出错,详情:" . $error_msg . "\n";
    exit;
} else {
    echo "连接数据库成功" . "\n";
}

// 建立数据表
$sql = "create table public.opentenbase(id integer,nickname varchar(100)) distribute by shard(id) to group default_group;";
$result = @pg_exec($conn, $sql);

if (!$result) {
    $error_msg = @pg_errormessage($conn);
    echo "创建数据表出错,详情:" . $error_msg . "\n";
    exit;
} else {
    echo "创建数据表成功" . "\n";
}

// 关闭连接
pg_close($conn);
?>

执行方式

[root@VM_0_47_centos test]# curl http://127.0.0.1:8080/dbsta/test/createtable.php
连接数据库成功
创建数据表成功

5.3 插入数据

本节展示如何使用 PHP 向 OpenTenBase 数据库插入数据。

示例代码

<?php
$host = "172.16.0.29";
$port = "15432";
$dbname = "postgres";
$user = "opentenbase";
$password = "";

// 连接数据库
$conn = @pg_connect("host=$host port=$port dbname=$dbname user=$user password=$password");

if (!$conn) {
    $error_msg = @pg_errormessage($conn);
    echo "连接数据库出错,详情:" . $error_msg . "\n";
    exit;
} else {
    echo "连接数据库成功" . "\n";
}

// 插入数据
$sql = "insert into public.opentenbase values(1,'opentenbase'),(2,'pgxz');";
$result = @pg_exec($conn, $sql);

if (!$result) {
    $error_msg = @pg_errormessage($conn);
    echo "插入数据出错,详情:" . $error_msg . "\n";
    exit;
} else {
    echo "插入数据成功" . "\n";
}

// 关闭连接
pg_close($conn);
?>

执行方式

[opentenbase@VM_0_47_centos test]$ curl http://127.0.0.1:8080/dbsta/test/insert.php
连接数据库成功
插入数据成功

5.4 查询数据

本节演示如何使用 PHP 查询 OpenTenBase 数据库表并输出结果。

示例代码

<?php
$host = "172.16.0.29";
$port = "15432";
$dbname = "postgres";
$user = "opentenbase";
$password = "";

// 连接数据库
$conn = @pg_connect("host=$host port=$port dbname=$dbname user=$user password=$password");

if (!$conn) {
    $error_msg = @pg_errormessage($conn);
    echo "连接数据库出错,详情:" . $error_msg . "\n";
    exit;
} else {
    echo "连接数据库成功" . "\n";
}

// 查询数据
$sql = "select id,nickname from public.opentenbase";
$result = @pg_exec($conn, $sql);

if (!$result) {
    $error_msg = @pg_errormessage($conn);
    echo "查询数据出错,详情:" . $error_msg . "\n";
    exit;
} else {
    echo "插入数据成功" . "\n";
}

$record_num = pg_numrows($result);
echo "返回记录数" . $record_num . "\n";

$rec = pg_fetch_all($result);

for ($i = 0; $i < $record_num; $i++) {
    echo "记录数#" . strval($i + 1) . "\n";
    echo "id:" . $rec[$i]["id"] . "\n";
    echo "nickname:" . $rec[$i]["nickname"] . "\n\n";
}

// 关闭连接
pg_close($conn);
?>

调用方法

[root@VM_0_47_centos ~]# curl http://127.0.0.1:8080/dbsta/test/select.php
连接数据库成功
插入数据成功
返回记录数2
记录数#1
id:1
nickname:opentenbase

记录数#2
id:2
nickname:pgxz

5.5 流数据 COPY 入表

本节演示如何使用 PHP 将内存中的数据通过 pg_copy_from() 函数导入 OpenTenBase 表中。

示例代码

<?php

$host = "172.16.0.29";
$port = "15432";
$dbname = "postgres";
$user = "opentenbase";
$password = "";

// 连接数据库
$conn = @pg_connect("host=$host port=$port dbname=$dbname user=$user password=$password");

if (!$conn) {
    $error_msg = @pg_errormessage($conn);
    echo "连接数据库出错,详情:" . $error_msg . "\n";
    exit;
} else {
    echo "连接数据库成功" . "\n";
}

$row = array("1,opentenbase", "2,pgxz");
$flag = pg_copy_from($conn, "public.opentenbase", $row, ",");

if (!$flag) {
    $error_msg = @pg_errormessage($conn);
    echo "COPY 出错,详情:" . $error_msg . "\n";
} else {
    echo "COPY 成功" . "\n";
}

// 关闭连接
pg_close($conn);

?>

调用方式

curl http://127.0.0.1/dbsta/cron/php_copy_from.php
连接数据库成功
COPY 成功

5.6 COPY TO 导出数据到数组中

本节展示如何使用 PHP 从 OpenTenBase 表中导出数据为数组形式。

示例代码

<?php

$host = "172.16.0.29";
$port = "15432";
$dbname = "postgres";
$user = "opentenbase";
$password = "";

// 连接数据库
$conn = @pg_connect("host=$host port=$port dbname=$dbname user=$user password=$password");

if (!$conn) {
    $error_msg = @pg_errormessage($conn);
    echo "连接数据库出错,详情:" . $error_msg . "\n";
    exit;
} else {
    echo "连接数据库成功" . "\n";
}

$row = pg_copy_to($conn, "public.opentenbase", ",");
if (!$row) {
    $error_msg = @pg_errormessage($conn);
    echo "COPY 出错,详情:" . $error_msg . "\n";
} else {
    print_r($row);
}

// 关闭连接
pg_close($conn);

?>

调用方式

curl http://127.0.0.1/dbsta/cron/php_copy_to.php
连接数据库成功
Array
(
    [0] => 1,opentenbase
    [1] => 2,pgxz
)

6 Golang 程序开发

6.1 连接数据库

本节展示如何使用 Golang 连接 OpenTenBase 数据库。

示例代码

package main

import (
    "fmt"
    "time"

    "github.com/jackc/pgx"
)

func main() {
    var error_msg string

    // 连接数据库
    conn, err := db_connect()
    if err != nil {
        error_msg = "连接数据库失败,详情:" + err.Error()
        write_log("Error", error_msg)
        return
    }
    defer conn.Close()
    write_log("Log", "连接数据库成功")
}

func write_log(log_level string, error_msg string) {
    fmt.Println("访问时间:", time.Now().Format("2006-01-02 15:04:05"))
    fmt.Println("日志级别:", log_level)
    fmt.Println("详细信息:", error_msg)
}

func db_connect() (conn *pgx.Conn, err error) {
    var config pgx.ConnConfig
    config.Host = "127.0.0.1"
    config.User = "opentenbase"
    config.Password = "pgsql"
    config.Database = "postgres"
    config.Port = 15432
    conn, err = pgx.Connect(config)
    return conn, err
}

执行示例

[root@VM_0_29_centos opentenbase]# go run conn.go
访问时间: 2018-04-03 20:40:28
日志级别: Log
详细信息: 连接数据库成功

6.2 创建数据表

本节演示如何使用 Golang 在 OpenTenBase 中创建一张表。

示例代码

package main

import (
    "fmt"
    "time"

    "github.com/jackc/pgx"
)

func main() {
    var error_msg string
    var sql string

    // 连接数据库
    conn, err := db_connect()
    if err != nil {
        error_msg = "连接数据库失败,详情:" + err.Error()
        write_log("Error", error_msg)
        return
    }
    defer conn.Close()
    write_log("Log", "连接数据库成功")

    // 建立数据表
    sql = "create table public.opentenbase(id varchar(20),nickname varchar(100)) distribute by shard(id) to group default_group;"
    _, err = conn.Exec(sql)
    if err != nil {
        error_msg = "创建数据表失败,详情:" + err.Error()
        write_log("Error", error_msg)
        return
    } else {
        write_log("Log", "创建数据表成功")
    }
}

func write_log(log_level string, error_msg string) {
    fmt.Println("访问时间:", time.Now().Format("2006-01-02 15:04:05"))
    fmt.Println("日志级别:", log_level)
    fmt.Println("详细信息:", error_msg)
}

func db_connect() (conn *pgx.Conn, err error) {
    var config pgx.ConnConfig
    config.Host = "127.0.0.1"
    config.User = "opentenbase"
    config.Password = "pgsql"
    config.Database = "postgres"
    config.Port = 15432
    conn, err = pgx.Connect(config)
    return conn, err
}

执行示例

[root@VM_0_29_centos opentenbase]# go run createtable.go
访问时间: 2018-04-03 20:50:24
日志级别: Log
详细信息: 连接数据库成功
访问时间: 2018-04-03 20:50:24
日志级别: Log
详细信息: 创建数据表成功

6.3 插入数据

本节演示如何使用 Golang 向 OpenTenBase 表中插入数据。

示例代码

package main

import (
    "fmt"
    "strings"
    "time"

    "github.com/jackc/pgx"
)

func main() {
    var error_msg string
    var sql string
    var nickname string

    // 连接数据库
    conn, err := db_connect()
    if err != nil {
        error_msg = "连接数据库失败,详情:" + err.Error()
        write_log("Error", error_msg)
        return
    }
    defer conn.Close()
    write_log("Log", "连接数据库成功")

    // 插入数据
    sql = "insert into public.opentenbase values('1','opentenbase'),('2','pgxz');"
    _, err = conn.Exec(sql)
    if err != nil {
        error_msg = "插入数据失败,详情:" + err.Error()
        write_log("Error", error_msg)
        return
    } else {
        write_log("Log", "插入数据成功")
    }

    // 绑定变量插入数据
    sql = "insert into public.opentenbase values($1,$2),($1,$3);"
    _, err = conn.Exec(sql, "3", "postgresql", "postgres")
    if err != nil {
        error_msg = "插入数据失败,详情:" + err.Error()
        write_log("Error", error_msg)
        return
    } else {
        write_log("Log", "插入数据成功")
    }

    // 拼接SQL语句插入数据
    nickname = "OpenTenBase is ' good!"
    sql = "insert into public.opentenbase values('1','" + sql_data_encode(nickname) + "')"
    _, err = conn.Exec(sql)
    if err != nil {
        error_msg = "插入数据失败,详情:" + err.Error()
        write_log("Error", error_msg)
        return
    } else {
        write_log("Log", "插入数据成功")
    }
}

func sql_data_encode(str string) string {
    return strings.Replace(str, "'", "''", -1)
}

func write_log(log_level string, error_msg string) {
    fmt.Println("访问时间:", time.Now().Format("2006-01-02 15:04:05"))
    fmt.Println("日志级别:", log_level)
    fmt.Println("详细信息:", error_msg)
}

func db_connect() (conn *pgx.Conn, err error) {
    var config pgx.ConnConfig
    config.Host = "127.0.0.1"
    config.User = "opentenbase"
    config.Password = "pgsql"
    config.Database = "postgres"
    config.Port = 15432
    conn, err = pgx.Connect(config)
    return conn, err
}

执行示例

[root@VM_0_29_centos opentenbase]# go run insert.go
访问时间: 2018-04-03 21:05:51
日志级别: Log
详细信息: 连接数据库成功
访问时间: 2018-04-03 21:05:51
日志级别: Log
详细信息: 插入数据成功
访问时间: 2018-04-03 21:05:51
日志级别: Log
详细信息: 插入数据成功
访问时间: 2018-04-03 21:05:51
日志级别: Log
详细信息: 插入数据成功

6.4 查询数据

本节演示如何使用 Golang 查询 OpenTenBase 数据并输出结果。

示例代码

package main

import (
    "fmt"
    "strings"
    "time"

    "github.com/jackc/pgx"
)

func main() {
    var error_msg string
    var sql string

    // 连接数据库
    conn, err := db_connect()
    if err != nil {
        error_msg = "连接数据库失败,详情:" + err.Error()
        write_log("Error", error_msg)
        return
    }
    defer conn.Close()
    write_log("Log", "连接数据库成功")

    // 查询数据
    sql = "SELECT id,nickname FROM public.opentenbase LIMIT 2"
    rows, err := conn.Query(sql)
    if err != nil {
        error_msg = "查询数据失败,详情:" + err.Error()
        write_log("Error", error_msg)
        return
    } else {
        write_log("Log", "查询数据成功")
    }

    var nickname, id string
    for rows.Next() {
        err = rows.Scan(&id, &nickname)
        if err != nil {
            error_msg = "执行查询失败,详情:" + err.Error()
            write_log("Error", error_msg)
            return
        }
        error_msg = fmt.Sprintf("id:%s nickname:%s", id, nickname)
        write_log("Log", error_msg)
    }
    rows.Close()
}

func sql_data_encode(str string) string {
    return strings.Replace(str, "'", "''", -1)
}

func write_log(log_level string, error_msg string) {
    fmt.Println("访问时间:", time.Now().Format("2006-01-02 15:04:05"))
    fmt.Println("日志级别:", log_level)
    fmt.Println("详细信息:", error_msg)
}

func db_connect() (conn *pgx.Conn, err error) {
    var config pgx.ConnConfig
    config.Host = "127.0.0.1"
    config.User = "opentenbase"
    config.Password = "pgsql"
    config.Database = "postgres"
    config.Port = 15432
    conn, err = pgx.Connect(config)
    return conn, err
}
[root@VM_0_29_centos opentenbase]# go run select.go
访问时间: 2018-04-09 10:35:50
日志级别: Log
详细信息: 连接数据库成功
访问时间: 2018-04-09 10:35:50
日志级别: Log
详细信息: 查询数据成功
访问时间: 2018-04-09 10:35:50
日志级别: Log
详细信息: id:2 nickname:opentenbase
访问时间: 2018-04-09 10:35:50
日志级别: Log
详细信息: id:3 nickname:postgresql
访问时间: 2018-04-09 10:35:50
日志级别: Log
详细信息: 查询数据成功
访问时间: 2018-04-09 10:35:50
日志级别: Log
详细信息: id:1 nickname:opentenbase

说明:

  • 使用 conn.Query() 方法执行 SQL 查询。
  • 使用 rows.Next() 遍历查询结果。
  • 使用 rows.Scan() 将每行数据映射到变量中。
  • 最后调用 rows.Close() 关闭结果集。

6.5 流数据 COPY FROM 入表

本节演示如何使用 Golang 批量导入数据到 OpenTenBase 表中,利用 PostgreSQL 的高效批量插入机制 COPY FROM

示例代码

package main

import (
    "fmt"
    "math/rand"
    "time"

    "github.com/jackc/pgx"
)

func main() {
    var error_msg string

    // 连接数据库
    conn, err := db_connect()
    if err != nil {
        error_msg = "连接数据库失败,详情:" + err.Error()
        write_log("Error", error_msg)
        return
    }
    defer conn.Close()
    write_log("Log", "连接数据库成功")

    // 构造 5000 行测试数据
    inputRows := [][]interface{}{}
    var id string
    var nickname string
    for i := 0; i < 5000; i++ {
        id = fmt.Sprintf("%d", rand.Intn(10000))
        nickname = fmt.Sprintf("%d", rand.Intn(10000))
        inputRows = append(inputRows, []interface{}{id, nickname})
    }

    // 使用 CopyFrom 插入数据
    copyCount, err := conn.CopyFrom(pgx.Identifier{"opentenbase"}, []string{"id", "nickname"}, pgx.CopyFromRows(inputRows))
    if err != nil {
        error_msg = "执行 COPY FROM 失败,详情:" + err.Error()
        write_log("Error", error_msg)
        return
    }

    if copyCount != len(inputRows) {
        error_msg = fmt.Sprintf("执行 COPY FROM 失败,期望 %d 行,实际插入 %d 行", len(inputRows), copyCount)
        write_log("Error", error_msg)
        return
    } else {
        write_log("Log", "COPY 数据插入成功")
    }
}

/*
功能描述:写入日志处理
参数说明:
log_level -- 日志级别,只能是 Error 或 Log
error_msg -- 日志内容
返回值说明:无
*/
func write_log(log_level string, error_msg string) {
    fmt.Println("访问时间:", time.Now().Format("2006-01-02 15:04:05"))
    fmt.Println("日志级别:", log_level)
    fmt.Println("详细信息:", error_msg)
}

/*
功能描述:连接数据库
参数说明:无
返回值说明:
conn *pgx.Conn -- 连接信息
err error -- 错误信息
*/
func db_connect() (conn *pgx.Conn, err error) {
    var config pgx.ConnConfig
    config.Host = "127.0.0.1"     // 数据库主机 IP 或 Hostname
    config.User = "opentenbase"   // 连接用户
    config.Password = "pgsql"     // 用户密码
    config.Database = "postgres"  // 数据库名
    config.Port = 15432           // 端口号
    conn, err = pgx.Connect(config)
    return conn, err
}

执行示例

[root@VM_0_29_centos opentenbase]# go run copy_from.go
访问时间: 2018-04-09 10:36:40
日志级别: Log
详细信息: 连接数据库成功
访问时间: 2018-04-09 10:36:40
日志级别: Log
详细信息: COPY 数据插入成功

说明:

  • 使用 CopyFrom() 实现高性能批量插入。
  • pgx.CopyFromRows 是一种便捷构造器,用于将二维切片转换为 CopyFromSource 接口。
  • 支持大批量数据导入,性能远高于逐条插入。

6.6 Golang 相关资源包

以下为推荐使用的 Golang 第三方库和链接:

包名 功能说明 GitHub 地址
jackc/pgx PostgreSQL 的 Golang 客户端库 https://github.com/jackc/pgx
pkg/errors 增强错误处理能力 https://github.com/pkg/errors

安装命令(可选):

go get github.com/jackc/pgx/v4
go get github.com/pkg/errors

文章转载于OpenTenBase官网,原链接为:[应用接入指南]

请前往 登录/注册 即可发表您的看法…