Optimizing data processing by separating old database data by chunks.
This commit is contained in:
parent
90d8005c9a
commit
64bd20e0bf
|
@ -17,6 +17,7 @@ import (
|
||||||
var (
|
var (
|
||||||
goDatabaseUrl = kingpin.Flag("go-database", "magneticod Go version database URL.").Short('g').String()
|
goDatabaseUrl = kingpin.Flag("go-database", "magneticod Go version database URL.").Short('g').String()
|
||||||
pyDatabaseUrl = kingpin.Flag("py-database", "Python version database URL.").Short('p').Required().String()
|
pyDatabaseUrl = kingpin.Flag("py-database", "Python version database URL.").Short('p').Required().String()
|
||||||
|
chunkSize = kingpin.Flag("chunk-size", "How many torrents to retrieve from old DB at once").Short('c').Default("50").Uint64()
|
||||||
|
|
||||||
progress pb.ProgressBar
|
progress pb.ProgressBar
|
||||||
)
|
)
|
||||||
|
@ -63,35 +64,37 @@ func main() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func mergeDatabases(pyDb sql.DB, goDb persistence.Database) error {
|
func mergeDatabases(pyDb sql.DB, goDb persistence.Database) error {
|
||||||
// Selecting torrents
|
maxId, err := old.GetLastTorrentId(pyDb)
|
||||||
tRows, err := pyDb.Query("SELECT id, info_hash, name, total_size, discovered_on FROM torrents ORDER BY id ASC;")
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
zap.L().Fatal("Error when querying torrents from old database.")
|
zap.L().Fatal("Error while getting last torrent ID from old database")
|
||||||
}
|
}
|
||||||
defer tRows.Close()
|
|
||||||
|
|
||||||
for tRows.Next() {
|
var lastId uint64 = 0
|
||||||
var torrent persistence.TorrentMetadata
|
|
||||||
|
|
||||||
progress.Increment()
|
for lastId < maxId {
|
||||||
|
chunk, err := old.GetTorrentsChunk(pyDb, lastId, *chunkSize)
|
||||||
err := tRows.Scan(&torrent.ID, &torrent.InfoHash, &torrent.Name, &torrent.Size, &torrent.DiscoveredOn)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
zap.L().Error("Error scanning torrent row.", zap.Error(err))
|
zap.L().Error("Error while getting torrents chunk", zap.Uint64("last-id", lastId))
|
||||||
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
|
||||||
files, err := old.GetFilesForTorrent(pyDb, torrent)
|
if len(chunk) == 0 {
|
||||||
if err != nil {
|
if (maxId - lastId) <= *chunkSize {
|
||||||
zap.L().Error("Error getting files for torrent.", zap.String("hash", hex.EncodeToString(torrent.InfoHash)), zap.Error(err))
|
zap.L().Warn("Probably finished processing all data (earlier). Finishing.", zap.Uint64("last", lastId), zap.Uint64("max", maxId))
|
||||||
|
|
||||||
continue
|
return nil
|
||||||
|
} else {
|
||||||
|
zap.L().Error("Prematurely received empty chunk, aborting", zap.Uint64("last", lastId), zap.Uint64("max", maxId))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = importTorrent(goDb, torrent, files)
|
for _, piece := range chunk {
|
||||||
if err != nil {
|
lastId = piece.Torrent.ID
|
||||||
zap.L().Error("Error when importing torrent to Go database.", zap.String("hash", hex.EncodeToString(torrent.InfoHash)), zap.Error(err))
|
progress.Increment()
|
||||||
|
|
||||||
|
err = importTorrent(goDb, piece.Torrent, piece.Files)
|
||||||
|
if err != nil {
|
||||||
|
zap.L().Error("Error when importing torrent to Go database.", zap.String("hash", hex.EncodeToString(piece.Torrent.InfoHash)), zap.Error(err))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,11 +2,18 @@ package old
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/skobkin/magnetico/pkg/persistence"
|
"github.com/skobkin/magnetico/pkg/persistence"
|
||||||
|
"go.uber.org/zap"
|
||||||
"net/url"
|
"net/url"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type TorrentData struct {
|
||||||
|
Torrent persistence.TorrentMetadata
|
||||||
|
Files []persistence.File
|
||||||
|
}
|
||||||
|
|
||||||
func OpenSqliteDb(url_ url.URL) (*sql.DB, error) {
|
func OpenSqliteDb(url_ url.URL) (*sql.DB, error) {
|
||||||
url_.Scheme = ""
|
url_.Scheme = ""
|
||||||
return sql.Open("sqlite3", url_.String())
|
return sql.Open("sqlite3", url_.String())
|
||||||
|
@ -36,6 +43,69 @@ func GetNumberOfTorrents(db sql.DB) (uint, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func GetLastTorrentId(db sql.DB) (uint64, error) {
|
||||||
|
rows, err := db.Query("SELECT MAX(id) FROM torrents;")
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
if rows.Next() != true {
|
||||||
|
return 0, fmt.Errorf("No rows returned from `SELECT MAX(id)`")
|
||||||
|
}
|
||||||
|
|
||||||
|
var n *uint64
|
||||||
|
if err = rows.Scan(&n); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the database is empty (i.e. 0 entries in 'torrents') then the query will return nil.
|
||||||
|
if n == nil {
|
||||||
|
return 0, nil
|
||||||
|
} else {
|
||||||
|
return *n, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetTorrentsChunk(db sql.DB, fromId uint64, size uint64) ([]TorrentData, error) {
|
||||||
|
chunk := make([]TorrentData, 0)
|
||||||
|
|
||||||
|
// Selecting torrents
|
||||||
|
tRows, err := db.Query(`
|
||||||
|
SELECT id, info_hash, name, total_size, discovered_on
|
||||||
|
FROM torrents
|
||||||
|
WHERE id > ?
|
||||||
|
ORDER BY id ASC
|
||||||
|
LIMIT ?;
|
||||||
|
`, fromId, size)
|
||||||
|
if err != nil {
|
||||||
|
zap.L().Fatal("Error when querying torrents from old database.")
|
||||||
|
}
|
||||||
|
defer tRows.Close()
|
||||||
|
|
||||||
|
for tRows.Next() {
|
||||||
|
var torrent persistence.TorrentMetadata
|
||||||
|
|
||||||
|
err := tRows.Scan(&torrent.ID, &torrent.InfoHash, &torrent.Name, &torrent.Size, &torrent.DiscoveredOn)
|
||||||
|
if err != nil {
|
||||||
|
zap.L().Error("Error scanning torrent row.", zap.Error(err))
|
||||||
|
|
||||||
|
return chunk, err
|
||||||
|
}
|
||||||
|
|
||||||
|
files, err := GetFilesForTorrent(db, torrent)
|
||||||
|
if err != nil {
|
||||||
|
zap.L().Error("Error getting files for torrent.", zap.String("hash", hex.EncodeToString(torrent.InfoHash)), zap.Error(err))
|
||||||
|
|
||||||
|
return chunk, err
|
||||||
|
}
|
||||||
|
|
||||||
|
chunk = append(chunk, TorrentData{Torrent: torrent, Files: files})
|
||||||
|
}
|
||||||
|
|
||||||
|
return chunk, nil
|
||||||
|
}
|
||||||
|
|
||||||
func GetFilesForTorrent(pyDb sql.DB, torrent persistence.TorrentMetadata) ([]persistence.File, error) {
|
func GetFilesForTorrent(pyDb sql.DB, torrent persistence.TorrentMetadata) ([]persistence.File, error) {
|
||||||
files := make([]persistence.File, 0)
|
files := make([]persistence.File, 0)
|
||||||
|
|
||||||
|
|
Reference in a new issue