proc replclose_noenv { queuedir } {
global queuedbs machids
set dbs [array names queuedbs]
foreach tofrom $dbs {
set handle $queuedbs($tofrom)
error_check_good db_close [$handle close] 0
unset queuedbs($tofrom)
}
set machids {}
}
proc replsetup_noenv { queuedir } {
global queuedbs machids
file mkdir $queuedir
set dbs [array names queuedbs]
foreach tofrom $dbs {
unset queuedbs($tofrom)
}
set machids {}
}
proc replsend_noenv { control rec fromid toid flags lsn } {
global is_repchild
global queuedbs machids
global drop drop_msg
global perm_sent_list
global anywhere
global qtestdir testdir
if { ![info exists qtestdir] } {
set qtestdir $testdir
}
set queuedir $qtestdir/MSGQUEUEDIR
set permflags [lsearch $flags "perm"]
if { [llength $perm_sent_list] != 0 && $permflags != -1 } {
lappend perm_sent_list $lsn
}
if { $drop != 0 } {
incr drop
if { $drop == $drop_msg } {
set drop 1
return 0
}
}
if { $toid == -1 } {
set machlist $machids
} else {
set m NULL
if { $anywhere != 0 } {
set anyflags [lsearch $flags "any"]
if { $anyflags != -1 } {
foreach m $machids {
if { $m == $fromid || $m == $toid } {
continue
}
set machlist [list $m]
break
}
}
}
if { $m == "NULL" } {
set machlist [list $toid]
}
}
foreach m $machlist {
if { $m == $fromid } {
continue
}
set pid [pid]
set db $queuedbs($m.$fromid.$pid)
set stat [catch {$db put -append [list $control $rec $fromid]} ret]
}
if { $is_repchild } {
replready_noenv $fromid from
}
return 0
}
proc replmsglen_noenv { machid {tf "to"}} {
global queuedbs qtestdir testdir
if { ![info exists qtestdir] } {
set qtestdir $testdir
}
set queuedir $qtestdir/MSGQUEUEDIR
set orig [pwd]
cd $queuedir
if { $tf == "to" } {
set msgdbs [glob -nocomplain ready.$machid.*]
} else {
set msgdbs [glob -nocomplain ready.*.$machid.*]
}
cd $orig
return [llength $msgdbs]
}
proc replclear_noenv { machid {tf "to"}} {
global queuedbs qtestdir testdir
if { ![info exists qtestdir] } {
set qtestdir $testdir
}
set queuedir $qtestdir/MSGQUEUEDIR
set orig [pwd]
cd $queuedir
if { $tf == "to" } {
set msgdbs [glob -nocomplain ready.$machid.*]
} else {
set msgdbs [glob -nocomplain ready.*.$machid.*]
}
foreach m $msgdbs {
file delete -force $m
}
cd $orig
set dbs [array names queuedbs]
foreach tofrom $dbs {
if { [string match $machid.* $tofrom] == 1 } {
set db $queuedbs($tofrom)
set dbc [$db cursor]
for { set dbt [$dbc get -first] } \
{ [llength $dbt] > 0 } \
{ set dbt [$dbc get -next] } {
error_check_good \
replclear($machid)_del [$dbc del] 0
}
error_check_good replclear($db)_dbc_close [$dbc close] 0
}
}
cd $queuedir
if { $tf == "to" } {
set msgdbs [glob -nocomplain temp.$machid.*]
} else {
set msgdbs [glob -nocomplain temp.*.$machid.*]
}
foreach m $msgdbs {
}
cd $orig
}
proc replready_noenv { machid tf } {
global queuedbs machids
global counter
global qtestdir testdir
if { ![info exists qtestdir] } {
set qtestdir $testdir
}
set queuedir $qtestdir/MSGQUEUEDIR
set pid [pid]
set dbs [array names queuedbs]
set closed {}
foreach tofrom $dbs {
set toidx [string first . $tofrom]
set toid [string replace $tofrom $toidx end]
set fidx [expr $toidx + 1]
set fromidx [string first . $tofrom $fidx]
set fromid [string replace $tofrom $fromidx end]
set fromid [string replace $fromid 0 $toidx]
if { ($tf == "to" && $machid == $toid) || \
($tf == "from" && $machid == $fromid) } {
set nkeys [stat_field $queuedbs($tofrom) \
stat "Number of keys"]
if { $nkeys != 0 } {
lappend closed \
[list $toid $fromid temp.$tofrom]
error_check_good temp_close \
[$queuedbs($tofrom) close] 0
}
}
}
set cwd [pwd]
foreach filename $closed {
set toid [lindex $filename 0]
set fromid [lindex $filename 1]
set fname [lindex $filename 2]
set tofrom [string replace $fname 0 4]
incr counter($machid)
cd $queuedir
file rename -force $fname ready.$tofrom.$counter($machid)
cd $cwd
replsetuptempfile_noenv $toid $fromid $queuedir
}
}
proc repladd_noenv { machid } {
global queuedbs machids counter qtestdir testdir
if { ![info exists qtestdir] } {
set qtestdir $testdir
}
set queuedir $qtestdir/MSGQUEUEDIR
if { [info exists machids] } {
if { [lsearch -exact $machids $machid] >= 0 } {
error "FAIL: repladd_noenv: machid $machid already exists."
}
}
set counter($machid) 0
lappend machids $machid
replcreatetofiles_noenv $machid $queuedir
replcreatefromfiles_noenv $machid $queuedir
}
proc replcreatetofiles_noenv { toid queuedir } {
global machids
foreach m $machids {
if { $m == $toid } {
continue
}
replsetuptempfile_noenv $toid $m $queuedir
}
}
proc replcreatefromfiles_noenv { fromid queuedir } {
global machids
foreach m $machids {
if { $m == $fromid } {
continue
}
replsetuptempfile_noenv $m $fromid $queuedir
}
}
proc replsetuptempfile_noenv { to from queuedir } {
global queuedbs
set pid [pid]
set queuedbs($to.$from.$pid) [berkdb_open -create -excl -recno\
-renumber $queuedir/temp.$to.$from.$pid]
error_check_good open_queuedbs [is_valid_db $queuedbs($to.$from.$pid)] TRUE
}
proc replprocessqueue_noenv { dbenv machid { skip_interval 0 } { hold_electp NONE } \
{ dupmasterp NONE } { errp NONE } } {
global errorCode
global perm_response_list
global qtestdir testdir
if { [string compare $hold_electp NONE] != 0 } {
upvar $hold_electp hold_elect
}
set hold_elect 0
if { [string compare $dupmasterp NONE] != 0 } {
upvar $dupmasterp dupmaster
}
set dupmaster 0
if { [string compare $errp NONE] != 0 } {
upvar $errp errorp
}
set errorp 0
set nproced 0
set queuedir $qtestdir/MSGQUEUEDIR
set cwd [pwd]
cd $queuedir
set msgdbs [glob -nocomplain ready.$machid.*]
cd $cwd
foreach msgdb $msgdbs {
set db [berkdb_open $queuedir/$msgdb]
set dbc [$db cursor]
error_check_good process_dbc($machid) \
[is_valid_cursor $dbc $db] TRUE
for { set dbt [$dbc get -first] } \
{ [llength $dbt] != 0 } \
{ set dbt [$dbc get -next] } {
set data [lindex [lindex $dbt 0] 1]
set recno [lindex [lindex $dbt 0] 0]
if { $skip_interval != 0 } {
if { $nproced % $skip_interval == 1 } {
incr nproced
set dbt [$dbc get -next]
continue
}
}
error_check_good queue_remove [$dbc del] 0
set ret [catch {$dbenv rep_process_message \
[lindex $data 2] [lindex $data 0] \
[lindex $data 1]} res]
if { [llength $perm_response_list] != 0 && \
([is_substr $res ISPERM] || [is_substr $res NOTPERM]) } {
lappend perm_response_list $res
}
if { $ret != 0 } {
if { [string compare $errp NONE] != 0 } {
set errorp "$dbenv $machid $res"
} else {
error "FAIL:[timestamp]\
rep_process_message returned $res"
}
}
incr nproced
if { $ret == 0 } {
set rettype [lindex $res 0]
set retval [lindex $res 1]
if { [is_substr $rettype HOLDELECTION] } {
set hold_elect 1
}
if { [is_substr $rettype DUPMASTER] } {
set dupmaster "1 $dbenv $machid"
}
if { [is_substr $rettype NOTPERM] || \
[is_substr $rettype ISPERM] } {
set lsnfile [lindex $retval 0]
set lsnoff [lindex $retval 1]
}
}
if { $errorp != 0 } {
break
}
if { $hold_elect == 1 } {
break
}
if { $dupmaster == 1 } {
break
}
}
error_check_good dbc_close [$dbc close] 0
set nkeys [stat_field $db stat "Number of keys"]
error_check_good db_close [$db close] 0
if { $nkeys == 0 } {
set dbname [string replace $msgdb 0 5 done.]
file rename -force $queuedir/$msgdb $queuedir/$dbname
file delete -force $queuedir/$dbname
}
}
return $nproced
}