Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
C
canifa_note
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Vũ Hoàng Anh
canifa_note
Commits
73593839
Commit
73593839
authored
Aug 07, 2022
by
boojack
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
chore: use `transaction` for migration history
parent
d8e10ba3
Changes
4
Show whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
106 additions
and
55 deletions
+106
-55
root.go
bin/server/cmd/root.go
+4
-1
db.go
store/db/db.go
+45
-20
migration_history.go
store/db/migration_history.go
+49
-24
table.go
store/db/table.go
+8
-10
No files found.
bin/server/cmd/root.go
View file @
73593839
package
cmd
package
cmd
import
(
import
(
"context"
"fmt"
"fmt"
"os"
"os"
...
@@ -26,8 +27,10 @@ type Main struct {
...
@@ -26,8 +27,10 @@ type Main struct {
}
}
func
(
m
*
Main
)
Run
()
error
{
func
(
m
*
Main
)
Run
()
error
{
ctx
:=
context
.
Background
()
db
:=
DB
.
NewDB
(
m
.
profile
)
db
:=
DB
.
NewDB
(
m
.
profile
)
if
err
:=
db
.
Open
();
err
!=
nil
{
if
err
:=
db
.
Open
(
ctx
);
err
!=
nil
{
return
fmt
.
Errorf
(
"cannot open db: %w"
,
err
)
return
fmt
.
Errorf
(
"cannot open db: %w"
,
err
)
}
}
...
...
store/db/db.go
View file @
73593839
package
db
package
db
import
(
import
(
"context"
"database/sql"
"database/sql"
"embed"
"embed"
"errors"
"errors"
...
@@ -38,7 +39,7 @@ func NewDB(profile *profile.Profile) *DB {
...
@@ -38,7 +39,7 @@ func NewDB(profile *profile.Profile) *DB {
return
db
return
db
}
}
func
(
db
*
DB
)
Open
()
(
err
error
)
{
func
(
db
*
DB
)
Open
(
ctx
context
.
Context
)
(
err
error
)
{
// Ensure a DSN is set before attempting to open the database.
// Ensure a DSN is set before attempting to open the database.
if
db
.
profile
.
DSN
==
""
{
if
db
.
profile
.
DSN
==
""
{
return
fmt
.
Errorf
(
"dsn required"
)
return
fmt
.
Errorf
(
"dsn required"
)
...
@@ -53,32 +54,32 @@ func (db *DB) Open() (err error) {
...
@@ -53,32 +54,32 @@ func (db *DB) Open() (err error) {
db
.
Db
=
sqlDB
db
.
Db
=
sqlDB
// If mode is dev, we should migrate and seed the database.
// If mode is dev, we should migrate and seed the database.
if
db
.
profile
.
Mode
==
"dev"
{
if
db
.
profile
.
Mode
==
"dev"
{
if
err
:=
db
.
applyLatestSchema
();
err
!=
nil
{
if
err
:=
db
.
applyLatestSchema
(
ctx
);
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to apply latest schema: %w"
,
err
)
return
fmt
.
Errorf
(
"failed to apply latest schema: %w"
,
err
)
}
}
if
err
:=
db
.
seed
();
err
!=
nil
{
if
err
:=
db
.
seed
(
ctx
);
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to seed: %w"
,
err
)
return
fmt
.
Errorf
(
"failed to seed: %w"
,
err
)
}
}
}
else
{
}
else
{
// If db file not exists, we should migrate the database.
// If db file not exists, we should migrate the database.
if
_
,
err
:=
os
.
Stat
(
db
.
profile
.
DSN
);
errors
.
Is
(
err
,
os
.
ErrNotExist
)
{
if
_
,
err
:=
os
.
Stat
(
db
.
profile
.
DSN
);
errors
.
Is
(
err
,
os
.
ErrNotExist
)
{
err
:=
db
.
applyLatestSchema
()
err
:=
db
.
applyLatestSchema
(
ctx
)
if
err
!=
nil
{
if
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to apply latest schema: %w"
,
err
)
return
fmt
.
Errorf
(
"failed to apply latest schema: %w"
,
err
)
}
}
}
else
{
}
else
{
err
:=
db
.
createMigrationHistoryTable
()
err
:=
db
.
createMigrationHistoryTable
(
ctx
)
if
err
!=
nil
{
if
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to create migration_history table: %w"
,
err
)
return
fmt
.
Errorf
(
"failed to create migration_history table: %w"
,
err
)
}
}
currentVersion
:=
common
.
GetCurrentVersion
(
db
.
profile
.
Mode
)
currentVersion
:=
common
.
GetCurrentVersion
(
db
.
profile
.
Mode
)
migrationHistory
,
err
:=
findMigrationHistory
(
db
.
Db
,
&
MigrationHistoryFind
{})
migrationHistory
,
err
:=
db
.
FindMigrationHistory
(
ctx
,
&
MigrationHistoryFind
{})
if
err
!=
nil
{
if
err
!=
nil
{
return
err
return
err
}
}
if
migrationHistory
==
nil
{
if
migrationHistory
==
nil
{
migrationHistory
,
err
=
upsertMigrationHistory
(
db
.
Db
,
&
MigrationHistoryUpsert
{
migrationHistory
,
err
=
db
.
UpsertMigrationHistory
(
ctx
,
&
MigrationHistoryUpsert
{
Version
:
currentVersion
,
Version
:
currentVersion
,
})
})
if
err
!=
nil
{
if
err
!=
nil
{
...
@@ -105,7 +106,7 @@ func (db *DB) Open() (err error) {
...
@@ -105,7 +106,7 @@ func (db *DB) Open() (err error) {
normalizedVersion
:=
minorVersion
+
".0"
normalizedVersion
:=
minorVersion
+
".0"
if
common
.
IsVersionGreaterThan
(
normalizedVersion
,
migrationHistory
.
Version
)
&&
common
.
IsVersionGreaterOrEqualThan
(
currentVersion
,
normalizedVersion
)
{
if
common
.
IsVersionGreaterThan
(
normalizedVersion
,
migrationHistory
.
Version
)
&&
common
.
IsVersionGreaterOrEqualThan
(
currentVersion
,
normalizedVersion
)
{
println
(
"applying migration for"
,
normalizedVersion
)
println
(
"applying migration for"
,
normalizedVersion
)
if
err
:=
db
.
applyMigrationForMinorVersion
(
minorVersion
);
err
!=
nil
{
if
err
:=
db
.
applyMigrationForMinorVersion
(
ctx
,
minorVersion
);
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to apply minor version migration: %w"
,
err
)
return
fmt
.
Errorf
(
"failed to apply minor version migration: %w"
,
err
)
}
}
}
}
...
@@ -127,20 +128,20 @@ const (
...
@@ -127,20 +128,20 @@ const (
latestSchemaFileName
=
"LATEST__SCHEMA.sql"
latestSchemaFileName
=
"LATEST__SCHEMA.sql"
)
)
func
(
db
*
DB
)
applyLatestSchema
()
error
{
func
(
db
*
DB
)
applyLatestSchema
(
ctx
context
.
Context
)
error
{
latestSchemaPath
:=
fmt
.
Sprintf
(
"%s/%s/%s"
,
"migration"
,
db
.
profile
.
Mode
,
latestSchemaFileName
)
latestSchemaPath
:=
fmt
.
Sprintf
(
"%s/%s/%s"
,
"migration"
,
db
.
profile
.
Mode
,
latestSchemaFileName
)
buf
,
err
:=
migrationFS
.
ReadFile
(
latestSchemaPath
)
buf
,
err
:=
migrationFS
.
ReadFile
(
latestSchemaPath
)
if
err
!=
nil
{
if
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to read latest schema %q, error %w"
,
latestSchemaPath
,
err
)
return
fmt
.
Errorf
(
"failed to read latest schema %q, error %w"
,
latestSchemaPath
,
err
)
}
}
stmt
:=
string
(
buf
)
stmt
:=
string
(
buf
)
if
err
:=
db
.
execute
(
stmt
);
err
!=
nil
{
if
err
:=
db
.
execute
(
ctx
,
stmt
);
err
!=
nil
{
return
fmt
.
Errorf
(
"migrate error: statement:%s err=%w"
,
stmt
,
err
)
return
fmt
.
Errorf
(
"migrate error: statement:%s err=%w"
,
stmt
,
err
)
}
}
return
nil
return
nil
}
}
func
(
db
*
DB
)
applyMigrationForMinorVersion
(
minorVersion
string
)
error
{
func
(
db
*
DB
)
applyMigrationForMinorVersion
(
ctx
context
.
Context
,
minorVersion
string
)
error
{
filenames
,
err
:=
fs
.
Glob
(
migrationFS
,
fmt
.
Sprintf
(
"%s/%s/*.sql"
,
"migration/prod"
,
minorVersion
))
filenames
,
err
:=
fs
.
Glob
(
migrationFS
,
fmt
.
Sprintf
(
"%s/%s/*.sql"
,
"migration/prod"
,
minorVersion
))
if
err
!=
nil
{
if
err
!=
nil
{
return
err
return
err
...
@@ -157,22 +158,32 @@ func (db *DB) applyMigrationForMinorVersion(minorVersion string) error {
...
@@ -157,22 +158,32 @@ func (db *DB) applyMigrationForMinorVersion(minorVersion string) error {
}
}
stmt
:=
string
(
buf
)
stmt
:=
string
(
buf
)
migrationStmt
+=
stmt
migrationStmt
+=
stmt
if
err
:=
db
.
execute
(
stmt
);
err
!=
nil
{
if
err
:=
db
.
execute
(
ctx
,
stmt
);
err
!=
nil
{
return
fmt
.
Errorf
(
"migrate error: statement:%s err=%w"
,
stmt
,
err
)
return
fmt
.
Errorf
(
"migrate error: statement:%s err=%w"
,
stmt
,
err
)
}
}
}
}
tx
,
err
:=
db
.
Db
.
Begin
()
if
err
!=
nil
{
return
err
}
defer
tx
.
Rollback
()
// upsert the newest version to migration_history
// upsert the newest version to migration_history
if
_
,
err
=
upsertMigrationHistory
(
db
.
Db
,
&
MigrationHistoryUpsert
{
if
_
,
err
=
upsertMigrationHistory
(
ctx
,
tx
,
&
MigrationHistoryUpsert
{
Version
:
minorVersion
+
".0"
,
Version
:
minorVersion
+
".0"
,
});
err
!=
nil
{
});
err
!=
nil
{
return
err
return
err
}
}
if
err
:=
tx
.
Commit
();
err
!=
nil
{
return
err
}
return
nil
return
nil
}
}
func
(
db
*
DB
)
seed
()
error
{
func
(
db
*
DB
)
seed
(
ctx
context
.
Context
)
error
{
filenames
,
err
:=
fs
.
Glob
(
seedFS
,
fmt
.
Sprintf
(
"%s/*.sql"
,
"seed"
))
filenames
,
err
:=
fs
.
Glob
(
seedFS
,
fmt
.
Sprintf
(
"%s/*.sql"
,
"seed"
))
if
err
!=
nil
{
if
err
!=
nil
{
return
err
return
err
...
@@ -187,7 +198,7 @@ func (db *DB) seed() error {
...
@@ -187,7 +198,7 @@ func (db *DB) seed() error {
return
fmt
.
Errorf
(
"failed to read seed file, filename=%s err=%w"
,
filename
,
err
)
return
fmt
.
Errorf
(
"failed to read seed file, filename=%s err=%w"
,
filename
,
err
)
}
}
stmt
:=
string
(
buf
)
stmt
:=
string
(
buf
)
if
err
:=
db
.
execute
(
stmt
);
err
!=
nil
{
if
err
:=
db
.
execute
(
ctx
,
stmt
);
err
!=
nil
{
return
fmt
.
Errorf
(
"seed error: statement:%s err=%w"
,
stmt
,
err
)
return
fmt
.
Errorf
(
"seed error: statement:%s err=%w"
,
stmt
,
err
)
}
}
}
}
...
@@ -195,18 +206,22 @@ func (db *DB) seed() error {
...
@@ -195,18 +206,22 @@ func (db *DB) seed() error {
}
}
// excecute runs a single SQL statement within a transaction.
// excecute runs a single SQL statement within a transaction.
func
(
db
*
DB
)
execute
(
stmt
string
)
error
{
func
(
db
*
DB
)
execute
(
ctx
context
.
Context
,
stmt
string
)
error
{
tx
,
err
:=
db
.
Db
.
Begin
()
tx
,
err
:=
db
.
Db
.
Begin
()
if
err
!=
nil
{
if
err
!=
nil
{
return
err
return
err
}
}
defer
tx
.
Rollback
()
defer
tx
.
Rollback
()
if
_
,
err
:=
tx
.
Exec
(
stmt
);
err
!=
nil
{
if
_
,
err
:=
tx
.
ExecContext
(
ctx
,
stmt
);
err
!=
nil
{
return
err
}
if
err
:=
tx
.
Commit
();
err
!=
nil
{
return
err
return
err
}
}
return
tx
.
Commit
()
return
nil
}
}
// minorDirRegexp is a regular expression for minor version directory.
// minorDirRegexp is a regular expression for minor version directory.
...
@@ -234,8 +249,14 @@ func getMinorVersionList() []string {
...
@@ -234,8 +249,14 @@ func getMinorVersionList() []string {
}
}
// createMigrationHistoryTable creates the migration_history table if it doesn't exist.
// createMigrationHistoryTable creates the migration_history table if it doesn't exist.
func
(
db
*
DB
)
createMigrationHistoryTable
()
error
{
func
(
db
*
DB
)
createMigrationHistoryTable
(
ctx
context
.
Context
)
error
{
if
err
:=
createTable
(
db
.
Db
,
`
tx
,
err
:=
db
.
Db
.
Begin
()
if
err
!=
nil
{
return
err
}
defer
tx
.
Rollback
()
if
err
:=
createTable
(
ctx
,
tx
,
`
CREATE TABLE IF NOT EXISTS migration_history (
CREATE TABLE IF NOT EXISTS migration_history (
version TEXT NOT NULL PRIMARY KEY,
version TEXT NOT NULL PRIMARY KEY,
created_ts BIGINT NOT NULL DEFAULT (strftime('%s', 'now'))
created_ts BIGINT NOT NULL DEFAULT (strftime('%s', 'now'))
...
@@ -244,5 +265,9 @@ func (db *DB) createMigrationHistoryTable() error {
...
@@ -244,5 +265,9 @@ func (db *DB) createMigrationHistoryTable() error {
return
err
return
err
}
}
if
err
:=
tx
.
Commit
();
err
!=
nil
{
return
err
}
return
nil
return
nil
}
}
store/db/migration_history.go
View file @
73593839
package
db
package
db
import
(
import
(
"context"
"database/sql"
"database/sql"
"strings"
"strings"
)
)
...
@@ -18,23 +19,61 @@ type MigrationHistoryFind struct {
...
@@ -18,23 +19,61 @@ type MigrationHistoryFind struct {
Version
*
string
Version
*
string
}
}
func
findMigrationHistoryList
(
db
*
sql
.
DB
,
find
*
MigrationHistoryFind
)
([]
*
MigrationHistory
,
error
)
{
func
(
db
*
DB
)
FindMigrationHistory
(
ctx
context
.
Context
,
find
*
MigrationHistoryFind
)
(
*
MigrationHistory
,
error
)
{
tx
,
err
:=
db
.
Db
.
Begin
()
if
err
!=
nil
{
return
nil
,
err
}
defer
tx
.
Rollback
()
list
,
err
:=
findMigrationHistoryList
(
ctx
,
tx
,
find
)
if
err
!=
nil
{
return
nil
,
err
}
if
len
(
list
)
==
0
{
return
nil
,
nil
}
else
{
return
list
[
0
],
nil
}
}
func
(
db
*
DB
)
UpsertMigrationHistory
(
ctx
context
.
Context
,
upsert
*
MigrationHistoryUpsert
)
(
*
MigrationHistory
,
error
)
{
tx
,
err
:=
db
.
Db
.
Begin
()
if
err
!=
nil
{
return
nil
,
err
}
defer
tx
.
Rollback
()
migrationHistory
,
err
:=
upsertMigrationHistory
(
ctx
,
tx
,
upsert
)
if
err
!=
nil
{
return
nil
,
err
}
if
err
:=
tx
.
Commit
();
err
!=
nil
{
return
nil
,
err
}
return
migrationHistory
,
nil
}
func
findMigrationHistoryList
(
ctx
context
.
Context
,
tx
*
sql
.
Tx
,
find
*
MigrationHistoryFind
)
([]
*
MigrationHistory
,
error
)
{
where
,
args
:=
[]
string
{
"1 = 1"
},
[]
interface
{}{}
where
,
args
:=
[]
string
{
"1 = 1"
},
[]
interface
{}{}
if
v
:=
find
.
Version
;
v
!=
nil
{
if
v
:=
find
.
Version
;
v
!=
nil
{
where
,
args
=
append
(
where
,
"version = ?"
),
append
(
args
,
*
v
)
where
,
args
=
append
(
where
,
"version = ?"
),
append
(
args
,
*
v
)
}
}
rows
,
err
:=
db
.
Query
(
`
query
:=
`
SELECT
SELECT
version,
version,
created_ts
created_ts
FROM
FROM
migration_history
migration_history
WHERE `
+
strings
.
Join
(
where
,
" AND "
)
+
`
WHERE `
+
strings
.
Join
(
where
,
" AND "
)
+
`
ORDER BY created_ts DESC
`
,
ORDER BY created_ts DESC
args
...
,
`
)
rows
,
err
:=
tx
.
QueryContext
(
ctx
,
query
,
args
...
)
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
err
return
nil
,
err
}
}
...
@@ -56,21 +95,8 @@ func findMigrationHistoryList(db *sql.DB, find *MigrationHistoryFind) ([]*Migrat
...
@@ -56,21 +95,8 @@ func findMigrationHistoryList(db *sql.DB, find *MigrationHistoryFind) ([]*Migrat
return
migrationHistoryList
,
nil
return
migrationHistoryList
,
nil
}
}
func
findMigrationHistory
(
db
*
sql
.
DB
,
find
*
MigrationHistoryFind
)
(
*
MigrationHistory
,
error
)
{
func
upsertMigrationHistory
(
ctx
context
.
Context
,
tx
*
sql
.
Tx
,
upsert
*
MigrationHistoryUpsert
)
(
*
MigrationHistory
,
error
)
{
list
,
err
:=
findMigrationHistoryList
(
db
,
find
)
query
:=
`
if
err
!=
nil
{
return
nil
,
err
}
if
len
(
list
)
==
0
{
return
nil
,
nil
}
else
{
return
list
[
0
],
nil
}
}
func
upsertMigrationHistory
(
db
*
sql
.
DB
,
upsert
*
MigrationHistoryUpsert
)
(
*
MigrationHistory
,
error
)
{
row
,
err
:=
db
.
Query
(
`
INSERT INTO migration_history (
INSERT INTO migration_history (
version
version
)
)
...
@@ -79,9 +105,8 @@ func upsertMigrationHistory(db *sql.DB, upsert *MigrationHistoryUpsert) (*Migrat
...
@@ -79,9 +105,8 @@ func upsertMigrationHistory(db *sql.DB, upsert *MigrationHistoryUpsert) (*Migrat
SET
SET
version=EXCLUDED.version
version=EXCLUDED.version
RETURNING version, created_ts
RETURNING version, created_ts
`
,
`
upsert
.
Version
,
row
,
err
:=
tx
.
QueryContext
(
ctx
,
query
,
upsert
.
Version
)
)
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
err
return
nil
,
err
}
}
...
...
store/db/table.go
View file @
73593839
package
db
package
db
import
(
import
(
"context"
"database/sql"
"database/sql"
"strings"
"strings"
)
)
...
@@ -11,20 +12,19 @@ type Table struct {
...
@@ -11,20 +12,19 @@ type Table struct {
}
}
//lint:ignore U1000 Ignore unused function temporarily for debugging
//lint:ignore U1000 Ignore unused function temporarily for debugging
func
findTable
(
db
*
sql
.
DB
,
tableName
string
)
(
*
Table
,
error
)
{
func
findTable
(
ctx
context
.
Context
,
tx
*
sql
.
Tx
,
tableName
string
)
(
*
Table
,
error
)
{
where
,
args
:=
[]
string
{
"1 = 1"
},
[]
interface
{}{}
where
,
args
:=
[]
string
{
"1 = 1"
},
[]
interface
{}{}
where
,
args
=
append
(
where
,
"type = ?"
),
append
(
args
,
"table"
)
where
,
args
=
append
(
where
,
"type = ?"
),
append
(
args
,
"table"
)
where
,
args
=
append
(
where
,
"name = ?"
),
append
(
args
,
tableName
)
where
,
args
=
append
(
where
,
"name = ?"
),
append
(
args
,
tableName
)
rows
,
err
:=
db
.
Query
(
`
query
:=
`
SELECT
SELECT
tbl_name,
tbl_name,
sql
sql
FROM sqlite_schema
FROM sqlite_schema
WHERE `
+
strings
.
Join
(
where
,
" AND "
),
WHERE `
+
strings
.
Join
(
where
,
" AND "
)
args
...
,
rows
,
err
:=
tx
.
QueryContext
(
ctx
,
query
,
args
...
)
)
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
err
return
nil
,
err
}
}
...
@@ -54,13 +54,11 @@ func findTable(db *sql.DB, tableName string) (*Table, error) {
...
@@ -54,13 +54,11 @@ func findTable(db *sql.DB, tableName string) (*Table, error) {
}
}
}
}
func
createTable
(
db
*
sql
.
DB
,
sql
string
)
error
{
func
createTable
(
ctx
context
.
Context
,
tx
*
sql
.
Tx
,
stmt
string
)
error
{
result
,
err
:=
db
.
Exec
(
sql
)
_
,
err
:=
tx
.
ExecContext
(
ctx
,
stmt
)
if
err
!=
nil
{
if
err
!=
nil
{
return
err
return
err
}
}
_
,
err
=
result
.
RowsAffected
()
return
nil
return
err
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment